You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/04 19:44:38 UTC
tez git commit: TEZ-2647. Add input causality dependency for attempts
(bikas)
Repository: tez
Updated Branches:
refs/heads/master 73da831e8 -> 6a99798f2
TEZ-2647. Add input causality dependency for attempts (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6a99798f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6a99798f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6a99798f
Branch: refs/heads/master
Commit: 6a99798f250310f8bd819aae89123e6146364983
Parents: 73da831
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Aug 4 10:42:40 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Aug 4 10:43:09 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 2 ++
.../dag/app/TaskAttemptListenerImpTezDag.java | 5 ++-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 3 ++
.../org/apache/tez/dag/app/dag/impl/Edge.java | 13 +++++---
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 21 ++++++++++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 17 +++++++++-
.../tez/dag/app/dag/impl/VertexManager.java | 3 +-
.../events/TaskAttemptFinishedEvent.java | 27 +++++++++++++++-
.../VertexRecoverableEventsGeneratedEvent.java | 3 +-
.../impl/HistoryEventJsonConversion.java | 4 +++
tez-dag/src/main/proto/HistoryEvents.proto | 3 ++
.../apache/tez/dag/app/MockDAGAppMaster.java | 10 +++---
.../tez/dag/app/TestMockDAGAppMaster.java | 6 ++--
.../app/TestTaskAttemptListenerImplTezDag.java | 5 ++-
.../app/dag/impl/TestTaskAttemptRecovery.java | 8 +++--
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 34 ++++++++++----------
.../tez/dag/app/dag/impl/TestVertexImpl.java | 6 +++-
.../TestHistoryEventsProtoConversion.java | 13 ++++++--
.../impl/TestHistoryEventJsonConversion.java | 2 +-
.../parser/datamodel/TaskAttemptInfo.java | 15 +++++++++
.../apache/tez/history/TestATSFileParser.java | 24 +++++++++++++-
.../ats/HistoryEventTimelineConversion.java | 6 +++-
.../ats/TestHistoryEventTimelineConversion.java | 9 ++++--
.../apache/tez/runtime/api/impl/TezEvent.java | 17 ++++++++++
25 files changed, 207 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59307b7..1f85954 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead.
TEZ-2468. Change the minimum Java version to Java 7.
TEZ-2646. Add scheduling casual dependency for attempts
+ TEZ-2647. Add input causality dependency for attempts
ALL CHANGES:
TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 4bf9f6d..1568b96 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -81,6 +81,8 @@ public class ATSConstants {
public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
public static final String COMPLETED_LOGS_URL = "completedLogsURL";
public static final String EXIT_STATUS = "exitStatus";
+ public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime";
+ public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA";
public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt";
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index fe92f3a..b7896c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections4.ListUtils;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.slf4j.Logger;
@@ -424,6 +423,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
" events: " + (inEvents != null? inEvents.size() : -1));
}
+ long currTime = context.getClock().getTime();
List<TezEvent> otherEvents = new ArrayList<TezEvent>();
// route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
// (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
@@ -431,6 +431,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
// 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ // for now, set the event time on the AM when it is received.
+ // this avoids any time disparity between machines.
+ tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 6c85cc2..4360cc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
/**
* Read only view of TaskAttempt.
@@ -79,6 +80,8 @@ public interface TaskAttempt {
float getProgress();
TaskAttemptState getState();
TaskAttemptState getStateNoLock();
+
+ void setLastEventSent(TezEvent lastEventSent);
/**
* Has attempt reached the final state or not.
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index ddccf8d..da74a46 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -377,7 +377,7 @@ public class Edge {
EventMetaData srcInfo = tezEvent.getSourceInfo();
for (DataMovementEvent dmEvent : compEvent.getEvents()) {
- TezEvent newEvent = new TezEvent(dmEvent, srcInfo);
+ TezEvent newEvent = new TezEvent(dmEvent, srcInfo, tezEvent.getEventReceivedTime());
sendTezEventToDestinationTasks(newEvent);
}
}
@@ -406,7 +406,7 @@ public class Edge {
InputFailedEvent ifEvent = ((InputFailedEvent) event);
e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
}
- tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
// cache the event object per input because are unique per input index
inputIndicesWithEvents.put(inputIndex, tezEventToSend);
@@ -553,7 +553,8 @@ public class Edge {
DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
targetIndices[numEventsDone]);
numEventsDone++;
- TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+ tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
listToAdd.add(tezEventToSend);
}
@@ -585,7 +586,8 @@ public class Edge {
while (numEventsDone < numEvents && listSize++ < listMaxSize) {
InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
numEventsDone++;
- TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+ tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
listToAdd.add(tezEventToSend);
}
@@ -617,7 +619,8 @@ public class Edge {
while (numEventsDone < numEvents && listSize++ < listMaxSize) {
DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
numEventsDone++;
- TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+ tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
listToAdd.add(tezEventToSend);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 40636dd..ebf7c58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -147,6 +147,10 @@ public class TaskAttemptImpl implements TaskAttempt,
private final Vertex vertex;
@VisibleForTesting
+ long lastDataEventTime;
+ TezTaskAttemptID lastDataEventSourceTA = null;
+
+ @VisibleForTesting
TaskAttemptStatus reportedStatus;
private DAGCounter localityCounter;
@@ -754,6 +758,8 @@ public class TaskAttemptImpl implements TaskAttempt,
: TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
+ this.lastDataEventTime = tEvent.getLastDataEventTime();
+ this.lastDataEventSourceTA = tEvent.getLastDataEventSourceTA();
sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
return recoveredState;
}
@@ -969,7 +975,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getVertex().getName(), getLaunchTime(),
getFinishTime(), TaskAttemptState.SUCCEEDED, null,
- "", getCounters());
+ "", getCounters(), lastDataEventTime, lastDataEventSourceTA);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -982,7 +988,8 @@ public class TaskAttemptImpl implements TaskAttempt,
clock.getTime(), state,
terminationCause,
StringUtils.join(
- getDiagnostics(), LINE_SEPARATOR), getCounters());
+ getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime,
+ lastDataEventSourceTA);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1485,7 +1492,7 @@ public class TaskAttemptImpl implements TaskAttempt,
new EventMetaData(EventProducerConsumerType.SYSTEM,
vertex.getName(),
edgeVertex.getName(),
- getID())));
+ getID()), appContext.getClock().getTime()));
}
sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
}
@@ -1562,4 +1569,12 @@ public class TaskAttemptImpl implements TaskAttempt,
public String toString() {
return getID().toString();
}
+
+
+ @Override
+ public void setLastEventSent(TezEvent lastEventSent) {
+ // task attempt id may be null for input data information events
+ this.lastDataEventSourceTA = lastEventSent.getSourceInfo().getTaskAttemptID();
+ this.lastDataEventTime = lastEventSent.getEventReceivedTime();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3888c7a..9519fa9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4092,7 +4092,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
int fromEventId, int preRoutedFromEventId, int maxEvents) {
- ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(
+ Task task = getTask(attemptID.getTaskID());
+ ArrayList<TezEvent> events = task.getTaskAttemptTezEvents(
attemptID, preRoutedFromEventId, maxEvents);
int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
int nextFromEventId = fromEventId;
@@ -4192,6 +4193,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
} finally {
onDemandRouteEventsReadLock.unlock();
}
+ if (!events.isEmpty()) {
+ for (int i=(events.size() - 1); i>=0; --i) {
+ TezEvent lastEvent = events.get(i);
+ // record the last event sent by the AM to the task
+ EventType lastEventType = lastEvent.getEventType();
+ // if the following changes then critical path logic/recording may need revision
+ if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT ||
+ lastEventType == EventType.DATA_MOVEMENT_EVENT ||
+ lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+ task.getAttempt(attemptID).setLastEventSent(lastEvent);
+ break;
+ }
+ }
+ }
return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index caa3432..64eb80f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -230,11 +230,12 @@ public class VertexManager {
Collection<InputDataInformationEvent> events) {
checkAndThrowIfDone();
verifyIsRootInput(inputName);
+ final long currTime = appContext.getClock().getTime();
Collection<TezEvent> tezEvents = Collections2.transform(events,
new Function<InputDataInformationEvent, TezEvent>() {
@Override
public TezEvent apply(InputDataInformationEvent riEvent) {
- TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata);
+ TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime);
tezEvent.setDestinationInfo(getDestinationMetaData(inputName));
return tezEvent;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index af529bf..52761e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -45,6 +45,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String diagnostics;
private TezCounters tezCounters;
private TaskAttemptTerminationCause error;
+ private TezTaskAttemptID lastDataEventSourceTA;
+ private long lastDataEventTime;
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
@@ -52,7 +54,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
long finishTime,
TaskAttemptState state,
TaskAttemptTerminationCause error,
- String diagnostics, TezCounters counters) {
+ String diagnostics, TezCounters counters,
+ long lastDataEventTime,
+ TezTaskAttemptID lastDataEventSourceTA) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
@@ -61,6 +65,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.diagnostics = diagnostics;
this.tezCounters = counters;
this.error = error;
+ this.lastDataEventTime = lastDataEventTime;
+ this.lastDataEventSourceTA = lastDataEventSourceTA;
}
public TaskAttemptFinishedEvent() {
@@ -80,6 +86,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public boolean isHistoryEvent() {
return true;
}
+
+ public long getLastDataEventTime() {
+ return lastDataEventTime;
+ }
+
+ public TezTaskAttemptID getLastDataEventSourceTA() {
+ return lastDataEventSourceTA;
+ }
public TaskAttemptFinishedProto toProto() {
TaskAttemptFinishedProto.Builder builder =
@@ -96,6 +110,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
+ if (lastDataEventSourceTA != null) {
+ builder.setLastDataEventSourceTA(lastDataEventSourceTA.toString());
+ builder.setLastDataEventTime(lastDataEventTime);
+ }
return builder.build();
}
@@ -113,6 +131,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
}
+ if (proto.hasLastDataEventSourceTA()) {
+ this.lastDataEventSourceTA = TezTaskAttemptID.fromString(proto.getLastDataEventSourceTA());
+ this.lastDataEventTime = proto.getLastDataEventTime();
+ }
}
@Override
@@ -140,6 +162,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", status=" + state.name()
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ + ", lastDataEventSourceTA=" +
+ ((lastDataEventSourceTA==null) ? null:lastDataEventSourceTA.toString())
+ + ", lastDataEventTime=" + lastDataEventTime
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
.replaceAll("\\n", ", ").replaceAll("\\s+", " "));
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
index 0310a26..6f44f33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -145,6 +145,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
if (event.getDestinationInfo() != null) {
evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
}
+ evtBuilder.setEventTime(event.getEventReceivedTime());
tezEventProtos.add(evtBuilder.build());
}
}
@@ -184,7 +185,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
if (eventProto.hasDestinationInfo()) {
destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
}
- TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+ TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
tezEvent.setDestinationInfo(destinationInfo);
this.events.add(tezEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 3fdfe0a..528da10 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -530,6 +530,10 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
+ otherInfo.put(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
+ if (event.getLastDataEventSourceTA() != null) {
+ otherInfo.put(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, event.getLastDataEventSourceTA().toString());
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 402349b..ffb382e 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -175,6 +175,8 @@ message TaskAttemptFinishedProto {
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
optional string error_enum = 6;
+ optional int64 last_data_event_time = 7;
+ optional string last_data_event_source_t_a = 8;
}
message EventMetaDataProto {
@@ -191,6 +193,7 @@ message TezDataMovementEventProto {
optional CompositeEventProto composite_data_movement_event = 4;
optional RootInputDataInformationEventProto root_input_data_information_event = 5;
optional RootInputInitializerEventProto input_initializer_event = 6;
+ optional int64 event_time = 7;
}
message VertexDataMovementEventsGeneratedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 08f6ff6..8fa57d3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -114,7 +114,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
public static interface EventsDelegate {
- public void getEvents(TaskSpec taskSpec, List<TezEvent> events);
+ public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
}
// mock container launcher does not launch real tasks.
@@ -408,7 +408,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
List<TezEvent> events = Lists.newArrayListWithCapacity(
cData.taskSpec.getOutputs().size() + 1);
if (cData.numUpdates == 0 && eventsDelegate != null) {
- eventsDelegate.getEvents(cData.taskSpec, events);
+ eventsDelegate.getEvents(cData.taskSpec, events, getContext().getClock().getTime());
}
TezCounters counters = null;
if (countersDelegate != null) {
@@ -422,7 +422,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
- EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+ EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+ getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
doHeartbeat(request, cData);
@@ -433,7 +434,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
cData.completed = true;
List<TezEvent> events = Collections.singletonList(new TezEvent(
new TaskAttemptCompletedEvent(), new EventMetaData(
- EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+ EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+ getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
doHeartbeat(request, cData);
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 4137d42..42d4b0b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -126,17 +126,17 @@ public class TestMockDAGAppMaster {
static class TestEventsDelegate implements EventsDelegate {
@Override
- public void getEvents(TaskSpec taskSpec, List<TezEvent> events) {
+ public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) {
for (OutputSpec output : taskSpec.getOutputs()) {
if (output.getPhysicalEdgeCount() == 1) {
events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
- .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+ .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
} else {
events.add(new TezEvent(CompositeDataMovementEvent.create(0,
output.getPhysicalEdgeCount(), null), new EventMetaData(
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
- .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+ .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 5c24ecc..d8a7388 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -99,12 +99,15 @@ public class TestTaskAttemptListenerImplTezDag {
eventHandler = mock(EventHandler.class);
+ MockClock clock = new MockClock();
+
appContext = mock(AppContext.class);
doReturn(eventHandler).when(appContext).getEventHandler();
doReturn(dag).when(appContext).getCurrentDAG();
doReturn(appAcls).when(appContext).getApplicationACLs();
doReturn(amContainerMap).when(appContext).getAllContainers();
-
+ doReturn(clock).when(appContext).getClock();
+
taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index d632aa3..920109b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -169,9 +169,11 @@ public class TestTaskAttemptRecovery {
errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
}
+ long lastDataEventTime = 1024;
+ TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, errorEnum, diag, counters));
+ startTime, finishTime, state, errorEnum, diag, counters, lastDataEventTime, lastDataEventTA));
assertEquals(startTime, ta.getLaunchTime());
assertEquals(finishTime, ta.getFinishTime());
assertEquals(counters, ta.reportedStatus.counters);
@@ -180,6 +182,8 @@ public class TestTaskAttemptRecovery {
assertEquals(1, ta.getDiagnostics().size());
assertEquals(diag, ta.getDiagnostics().get(0));
assertEquals(state, recoveredState);
+ assertEquals(lastDataEventTime, ta.lastDataEventTime);
+ assertEquals(lastDataEventTA, ta.lastDataEventSourceTA);
if (state != TaskAttemptState.SUCCEEDED) {
assertEquals(errorEnum, ta.getTerminationCause());
} else {
@@ -304,7 +308,7 @@ public class TestTaskAttemptRecovery {
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
startTime, finishTime, TaskAttemptState.KILLED,
- TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), 0, null));
assertEquals(TaskAttemptState.KILLED, recoveredState);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index feb290f..87e7498 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -286,7 +286,7 @@ public class TestTaskRecovery {
restoreFromTaskStartEvent();
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters()));
+ 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), 0, null));
task.handle(new TaskEventRecoverTask(task.getTaskId()));
// wait for the second task attempt is scheduled
dispatcher.await();
@@ -307,7 +307,7 @@ public class TestTaskRecovery {
restoreFromTaskStartEvent();
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters()));
+ 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), 0, null));
task.handle(new TaskEventRecoverTask(task.getTaskId()));
// wait for the second task attempt is scheduled
dispatcher.await();
@@ -329,7 +329,7 @@ public class TestTaskRecovery {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
try {
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters()));
+ 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), 0, null));
fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
} catch (TezUncheckedException e) {
assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
@@ -372,7 +372,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -405,7 +405,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -438,7 +438,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -473,7 +473,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -516,7 +516,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -528,7 +528,7 @@ public class TestTaskRecovery {
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -563,7 +563,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -575,7 +575,7 @@ public class TestTaskRecovery {
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -614,7 +614,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -654,7 +654,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -735,7 +735,7 @@ public class TestTaskRecovery {
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters()));
+ "", new TezCounters(), 0, null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(TaskAttemptStateInternal.NEW,
((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -776,7 +776,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.KILLED, null, "", null));
+ 0, TaskAttemptState.KILLED, null, "", null, 0, null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(0, task.failedAttempts);
@@ -806,7 +806,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, null, "", null));
+ 0, TaskAttemptState.FAILED, null, "", null, 0, null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -836,7 +836,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, null, "", null));
+ 0, TaskAttemptState.FAILED, null, "", null, 0, null));
}
assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
assertEquals(maxFailedAttempts - 1, task.failedAttempts);
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 98ef973..c55ea23 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -110,6 +110,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -2179,6 +2180,8 @@ public class TestVertexImpl {
}})
.when(execService).submit((Callable<Void>) any());
+ MockClock clock = new MockClock();
+
doReturn(execService).when(appContext).getExecService();
doReturn(conf).when(appContext).getAMConf();
doReturn(new Credentials()).when(dag).getCredentials();
@@ -2189,7 +2192,8 @@ public class TestVertexImpl {
doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
doReturn(historyEventHandler).when(appContext).getHistoryHandler();
doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
-
+ doReturn(clock).when(appContext).getClock();
+
vertexGroups = Maps.newHashMap();
for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 9be3531..a32cc27 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -505,7 +505,9 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- null, null, null);
+ null, null, null, 1024,
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 110), 1), 1));
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -518,6 +520,8 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getState());
Assert.assertEquals(event.getCounters(),
deserializedEvent.getCounters());
+ Assert.assertEquals(event.getLastDataEventTime(), deserializedEvent.getLastDataEventTime());
+ Assert.assertEquals(event.getLastDataEventSourceTA(), deserializedEvent.getLastDataEventSourceTA());
logEvents(event, deserializedEvent);
}
{
@@ -525,7 +529,7 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters());
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), 0, null);
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -588,9 +592,10 @@ public class TestHistoryEventsProtoConversion {
} catch (RuntimeException e) {
// Expected
}
+ long eventTime = 1024;
List<TezEvent> events =
Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
- new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+ new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null), eventTime));
event = new VertexRecoverableEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
@@ -601,6 +606,8 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getTezEvents().size());
Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
deserializedEvent.getTezEvents().get(0).getEventType());
+ Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(),
+ deserializedEvent.getTezEvents().get(0).getEventReceivedTime());
logEvents(event, deserializedEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index db871a2..ec1603e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null);
+ random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, 0, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index cca984a..b412c46 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -20,6 +20,8 @@ package org.apache.tez.history.parser.datamodel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -46,6 +48,9 @@ public class TaskAttemptInfo extends BaseInfo {
private final String status;
private final String logUrl;
private final String schedulingCausalTA;
+ private final long lastDataEventTime;
+ private final String lastDataEventSourceTA;
+
private TaskInfo taskInfo;
private Container container;
@@ -75,6 +80,8 @@ public class TaskAttemptInfo extends BaseInfo {
status = otherInfoNode.optString(Constants.STATUS);
container = new Container(containerId, nodeId);
+ lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME);
+ lastDataEventSourceTA = otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA);
}
void setTaskInfo(TaskInfo taskInfo) {
@@ -104,6 +111,14 @@ public class TaskAttemptInfo extends BaseInfo {
public final long getAbsoluteScheduledTime() {
return scheduledTime;
}
+
+ public final long getLastDataEventTime() {
+ return lastDataEventTime;
+ }
+
+ public final String getLastDataEventSourceTA() {
+ return lastDataEventSourceTA;
+ }
public final long getTimeTaken() {
return getFinishTime() - getStartTime();
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
index 1d59e98..d205056 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
@@ -80,6 +80,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertTrue;
@@ -105,7 +106,6 @@ public class TestATSFileParser {
"target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
- private static String timelineAddress;
private static TezClient tezClient;
private static int dagNumber;
@@ -228,8 +228,11 @@ public class TestATSFileParser {
assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
assertTrue(dagInfo.getVertices().size() == 2);
+ String lastSourceTA = null;
+ String lastDataEventSourceTA = null;
for (VertexInfo vertexInfo : dagInfo.getVertices()) {
assertTrue(vertexInfo.getKilledTasksCount() == 0);
+ long finishTime = 0;
for (TaskInfo taskInfo : vertexInfo.getTasks()) {
assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
@@ -240,6 +243,24 @@ public class TestATSFileParser {
assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+ List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
+ if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+ // get the last task to finish and track its successful attempt
+ if (finishTime < taskInfo.getAbsFinishTime()) {
+ finishTime = taskInfo.getAbsFinishTime();
+ lastSourceTA = taskInfo.getSuccessfulAttemptId();
+ }
+ } else {
+ for (TaskAttemptInfo attempt : attempts) {
+ assertTrue(attempt.getLastDataEventTime() > 0);
+ if (lastDataEventSourceTA == null) {
+ lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+ } else {
+ // all attempts should have the same last data event source TA
+ assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+ }
+ }
+ }
for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
assertTrue(attemptInfo.getStartTime() > 0);
assertTrue(attemptInfo.getScheduledTime() > 0);
@@ -258,6 +279,7 @@ public class TestATSFileParser {
assertTrue(vertexInfo.getInputVertices().size() == 1);
}
}
+ assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 95f77e2..eaed115 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -432,7 +432,11 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
atsEntity.addOtherInfo(ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getCounters()));
-
+ atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
+ if (event.getLastDataEventSourceTA() != null) {
+ atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_SOURCE_TA,
+ event.getLastDataEventSourceTA().toString());
+ }
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index bf8d0ec..838d9d6 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -169,7 +169,7 @@ public class TestHistoryEventTimelineConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
+ random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, 0, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -450,9 +450,10 @@ public class TestHistoryEventTimelineConversion {
.values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
String diagnostics = "random diagnostics message";
TezCounters counters = new TezCounters();
+ long lastDataEventTime = finishTime - 1;
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
- startTime, finishTime, state, error, diagnostics, counters);
+ startTime, finishTime, state, error, diagnostics, counters, lastDataEventTime, tezTaskAttemptID);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -475,12 +476,14 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(finishTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
- Assert.assertEquals(6, otherInfo.size());
+ Assert.assertEquals(8, otherInfo.size());
Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
+ Assert.assertEquals(lastDataEventTime, otherInfo.get(ATSConstants.LAST_DATA_EVENT_TIME));
+ Assert.assertEquals(tezTaskAttemptID.toString(), otherInfo.get(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 974e190..b44b7d4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -55,12 +55,19 @@ public class TezEvent implements Writable {
private EventMetaData sourceInfo;
private EventMetaData destinationInfo;
+
+ private long eventReceivedTime;
public TezEvent() {
}
public TezEvent(Event event, EventMetaData sourceInfo) {
+ this(event, sourceInfo, System.currentTimeMillis());
+ }
+
+ public TezEvent(Event event, EventMetaData sourceInfo, long time) {
this.event = event;
+ this.eventReceivedTime = time;
this.setSourceInfo(sourceInfo);
if (event instanceof DataMovementEvent) {
eventType = EventType.DATA_MOVEMENT_EVENT;
@@ -91,6 +98,14 @@ public class TezEvent implements Writable {
public Event getEvent() {
return event;
}
+
+ public void setEventReceivedTime(long eventReceivedTime) { // TODO save
+ this.eventReceivedTime = eventReceivedTime;
+ }
+
+ public long getEventReceivedTime() {
+ return eventReceivedTime;
+ }
public EventMetaData getSourceInfo() {
return sourceInfo;
@@ -119,6 +134,7 @@ public class TezEvent implements Writable {
}
out.writeBoolean(true);
out.writeInt(eventType.ordinal());
+ out.writeLong(eventReceivedTime);
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
@@ -188,6 +204,7 @@ public class TezEvent implements Writable {
return;
}
eventType = EventType.values()[in.readInt()];
+ eventReceivedTime = in.readLong();
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
event = new TaskStatusUpdateEvent();