You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/09 22:59:08 UTC
[2/2] tez git commit: TEZ-2789. Backport events added in TEZ-2612 to
branch-0.7 (bikas)
TEZ-2789. Backport events added in TEZ-2612 to branch-0.7 (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bc56ca31
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bc56ca31
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bc56ca31
Branch: refs/heads/branch-0.7
Commit: bc56ca3157973971b7e0e21ed834d56ecc7cdd46
Parents: 9be8cd4
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Sep 9 13:54:23 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Sep 9 13:54:23 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 4 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 19 ++-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 3 +
.../dag/event/TaskAttemptEventStatusUpdate.java | 9 ++
.../dag/app/dag/event/TaskEventTAUpdate.java | 14 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 13 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 138 +++++++++++++++++--
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 61 ++++++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 27 +++-
.../tez/dag/app/dag/impl/VertexManager.java | 3 +-
.../tez/dag/app/rm/container/AMContainer.java | 1 +
.../dag/app/rm/container/AMContainerImpl.java | 12 ++
.../events/TaskAttemptFinishedEvent.java | 32 ++++-
.../history/events/TaskAttemptStartedEvent.java | 52 +++++--
.../VertexRecoverableEventsGeneratedEvent.java | 3 +-
.../impl/HistoryEventJsonConversion.java | 9 ++
.../apache/tez/dag/history/utils/DAGUtils.java | 24 ++++
tez-dag/src/main/proto/HistoryEvents.proto | 10 ++
.../apache/tez/dag/app/MockDAGAppMaster.java | 10 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 6 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 38 ++++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 89 ++++++++++++
.../app/dag/impl/TestTaskAttemptRecovery.java | 27 +++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 41 +++++-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 46 +++----
.../tez/dag/app/dag/impl/TestVertexImpl.java | 31 ++++-
.../dag/app/rm/container/TestAMContainer.java | 5 +-
.../TestHistoryEventsProtoConversion.java | 30 +++-
.../impl/TestHistoryEventJsonConversion.java | 4 +-
.../ats/HistoryEventTimelineConversion.java | 11 +-
.../ats/TestHistoryEventTimelineConversion.java | 31 ++++-
.../apache/tez/runtime/api/impl/TezEvent.java | 17 +++
33 files changed, 710 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 812004c..90935c7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2789. Backport events added in TEZ-2612 to branch-0.7
TEZ-2766. Tez UI: Add vertex in-progress info in DAG details
TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
down an AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index a85dbd9..f786a4e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -58,6 +58,8 @@ public class ATSConstants {
public static final String VERTEX_NAME = "vertexName";
public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
public static final String SCHEDULED_TIME = "scheduledTime";
+ public static final String CREATION_TIME = "creationTime";
+ public static final String ALLOCATION_TIME = "allocationTime";
public static final String INIT_REQUESTED_TIME = "initRequestedTime";
public static final String INIT_TIME = "initTime";
public static final String START_REQUESTED_TIME = "startRequestedTime";
@@ -82,7 +84,9 @@ public class ATSConstants {
public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
public static final String COMPLETED_LOGS_URL = "completedLogsURL";
public static final String EXIT_STATUS = "exitStatus";
+ public static final String LAST_DATA_EVENTS = "lastDataEvents";
public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
+ public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
/* Counters-related keys */
public static final String COUNTER_GROUPS = "counterGroups";
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index ddbee59..5ef89f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections4.ListUtils;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.slf4j.Logger;
@@ -428,22 +427,34 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
" events: " + (inEvents != null? inEvents.size() : -1));
}
+ long currTime = context.getClock().getTime();
List<TezEvent> otherEvents = new ArrayList<TezEvent>();
// route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
// (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
// to VertexImpl to ensure the events ordering
// 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
// 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+ TaskAttemptEventStatusUpdate taskAttemptEvent = null;
+ boolean readErrorReported = false;
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ // for now, set the event time on the AM when it is received.
+ // this avoids any time disparity between machines.
+ tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
- TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
- (TaskStatusUpdateEvent) tezEvent.getEvent());
- context.getEventHandler().handle(taskAttemptEvent);
+ taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+ (TaskStatusUpdateEvent) tezEvent.getEvent());
} else {
+ if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
+ readErrorReported = true;
+ }
otherEvents.add(tezEvent);
}
}
+ if (taskAttemptEvent != null) {
+ taskAttemptEvent.setReadErrorReported(readErrorReported);
+ context.getEventHandler().handle(taskAttemptEvent);
+ }
if(!otherEvents.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
context.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 6c85cc2..4360cc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
/**
* Read only view of TaskAttempt.
@@ -79,6 +80,8 @@ public interface TaskAttempt {
float getProgress();
TaskAttemptState getState();
TaskAttemptState getStateNoLock();
+
+ void setLastEventSent(TezEvent lastEventSent);
/**
* Has attempt reached the final state or not.
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index c5a6ea7..458679c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -24,6 +24,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
private TaskStatusUpdateEvent taskAttemptStatus;
+ private boolean readErrorReported = false;
public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
TaskStatusUpdateEvent statusEvent) {
@@ -34,4 +35,12 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
public TaskStatusUpdateEvent getStatusEvent() {
return this.taskAttemptStatus;
}
+
+ public void setReadErrorReported(boolean value) {
+ readErrorReported = value;
+ }
+
+ public boolean getReadErrorReported() {
+ return readErrorReported;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
index 59c7363..01eaf5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
@@ -18,19 +18,31 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
+@SuppressWarnings("rawtypes")
public class TaskEventTAUpdate extends TaskEvent {
private TezTaskAttemptID attemptID;
+ private TezAbstractEvent causalEvent;
public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) {
+ this(id, type, null);
+ }
+
+ public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent) {
super(id.getTaskID(), type);
this.attemptID = id;
+ this.causalEvent = causalEvent;
}
-
+
public TezTaskAttemptID getTaskAttemptID() {
return attemptID;
}
+
+ public TezAbstractEvent getCausalEvent() {
+ return causalEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 576e1cf..0be7790 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -382,7 +382,7 @@ public class Edge {
EventMetaData srcInfo = tezEvent.getSourceInfo();
for (DataMovementEvent dmEvent : compEvent.getEvents()) {
- TezEvent newEvent = new TezEvent(dmEvent, srcInfo);
+ TezEvent newEvent = new TezEvent(dmEvent, srcInfo, tezEvent.getEventReceivedTime());
sendTezEventToDestinationTasks(newEvent);
}
}
@@ -411,7 +411,7 @@ public class Edge {
InputFailedEvent ifEvent = ((InputFailedEvent) event);
e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
}
- tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
// cache the event object per input because are unique per input index
inputIndicesWithEvents.put(inputIndex, tezEventToSend);
@@ -558,7 +558,8 @@ public class Edge {
DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
targetIndices[numEventsDone]);
numEventsDone++;
- TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+ tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
listToAdd.add(tezEventToSend);
}
@@ -590,7 +591,8 @@ public class Edge {
while (numEventsDone < numEvents && listSize++ < listMaxSize) {
InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
numEventsDone++;
- TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+ tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
listToAdd.add(tezEventToSend);
}
@@ -622,7 +624,8 @@ public class Edge {
while (numEventsDone < numEvents && listSize++ < listMaxSize) {
DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
numEventsDone++;
- TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+ tezEvent.getEventReceivedTime());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
listToAdd.add(tezEventToSend);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5792af0..266358d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -96,6 +97,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -118,6 +120,37 @@ public class TaskAttemptImpl implements TaskAttempt,
private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class);
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
+
+ public static class DataEventDependencyInfo {
+ long timestamp;
+ TezTaskAttemptID taId;
+ public DataEventDependencyInfo(long time, TezTaskAttemptID id) {
+ this.timestamp = time;
+ this.taId = id;
+ }
+ public long getTimestamp() {
+ return timestamp;
+ }
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taId;
+ }
+ public static DataEventDependencyInfoProto toProto(DataEventDependencyInfo info) {
+ DataEventDependencyInfoProto.Builder builder = DataEventDependencyInfoProto.newBuilder();
+ builder.setTimestamp(info.timestamp);
+ if (info.taId != null) {
+ builder.setTaskAttemptId(info.taId.toString());
+ }
+ return builder.build();
+ }
+
+ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto proto) {
+ TezTaskAttemptID taId = null;
+ if(proto.hasTaskAttemptId()) {
+ taId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+ }
+ return new DataEventDependencyInfo(proto.getTimestamp(), taId);
+ }
+ }
static final TezCounters EMPTY_COUNTERS = new TezCounters();
@@ -139,6 +172,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// TODO Can these be replaced by the container object TEZ-1037
private Container container;
+ private long allocationTime;
private ContainerId containerId;
private NodeId containerNodeId;
private String nodeHttpAddress;
@@ -148,6 +182,10 @@ public class TaskAttemptImpl implements TaskAttempt,
private final Vertex vertex;
@VisibleForTesting
+ boolean appendNextDataEvent = true;
+ ArrayList<DataEventDependencyInfo> lastDataEvents = Lists.newArrayList();
+
+ @VisibleForTesting
TaskAttemptStatus reportedStatus;
private DAGCounter localityCounter;
@@ -166,6 +204,9 @@ public class TaskAttemptImpl implements TaskAttempt,
private final Resource taskResource;
private final ContainerContext containerContext;
private final boolean leafVertex;
+
+ private TezTaskAttemptID creationCausalTA;
+ private long creationTime;
protected static final FailedTransitionHelper FAILED_HELPER =
new FailedTransitionHelper();
@@ -411,12 +452,22 @@ public class TaskAttemptImpl implements TaskAttempt,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
Task task) {
+ this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+ taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
+ task, null);
+ }
+ public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+ TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
+ boolean isRescheduled,
+ Resource resource, ContainerContext containerContext, boolean leafVertex,
+ Task task, TezTaskAttemptID schedulingCausalTA) {
- this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
+ MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
- this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
+ MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -431,6 +482,8 @@ public class TaskAttemptImpl implements TaskAttempt,
this.appContext = appContext;
this.task = task;
this.vertex = this.task.getVertex();
+ this.creationCausalTA = schedulingCausalTA;
+ this.creationTime = clock.getTime();
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
@@ -462,6 +515,10 @@ public class TaskAttemptImpl implements TaskAttempt,
public TezDAGID getDAGID() {
return getVertexID().getDAGId();
}
+
+ public TezTaskAttemptID getSchedulingCausalTA() {
+ return creationCausalTA;
+ }
TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
TaskSpec baseTaskSpec = task.getBaseTaskSpec();
@@ -660,6 +717,33 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
+ public long getCreationTime() {
+ readLock.lock();
+ try {
+ return creationTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public TezTaskAttemptID getCreationCausalAttempt() {
+ readLock.lock();
+ try {
+ return creationCausalTA;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public long getAllocationTime() {
+ readLock.lock();
+ try {
+ return allocationTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public long getFinishTime() {
readLock.lock();
@@ -753,6 +837,9 @@ public class TaskAttemptImpl implements TaskAttempt,
{
TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
this.launchTime = tEvent.getStartTime();
+ this.creationTime = tEvent.getCreationTime();
+ this.allocationTime = tEvent.getAllocationTime();
+ this.creationCausalTA = tEvent.getCreationCausalTA();
recoveryStartEventSeen = true;
recoveredState = TaskAttemptState.RUNNING;
this.containerId = tEvent.getContainerId();
@@ -770,6 +857,9 @@ public class TaskAttemptImpl implements TaskAttempt,
: TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
+ if (tEvent.getDataEvents() != null) {
+ this.lastDataEvents.addAll(tEvent.getDataEvents());
+ }
sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
return recoveredState;
}
@@ -973,7 +1063,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
attemptId, getVertex().getName(),
launchTime, containerId, containerNodeId,
- inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
+ inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, creationTime, creationCausalTA,
+ allocationTime);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), startEvt));
}
@@ -985,7 +1076,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getVertex().getName(), getLaunchTime(),
getFinishTime(), TaskAttemptState.SUCCEEDED, null,
- "", getCounters());
+ "", getCounters(), lastDataEvents);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1002,7 +1093,7 @@ public class TaskAttemptImpl implements TaskAttempt,
finishTime, state,
terminationCause,
StringUtils.join(
- getDiagnostics(), LINE_SEPARATOR), getCounters());
+ getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1130,7 +1221,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.getTaskAttemptState());
// Send out events to the Task - indicating TaskAttemptTermination(F/K)
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
- .getTaskEventType()));
+ .getTaskEventType(), event));
}
}
@@ -1140,9 +1231,10 @@ public class TaskAttemptImpl implements TaskAttempt,
public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
- Container container = ta.appContext.getAllContainers()
- .get(event.getContainerId()).getContainer();
+ AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId());
+ Container container = amContainer.getContainer();
+ ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
ta.container = container;
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
@@ -1268,12 +1360,16 @@ public class TaskAttemptImpl implements TaskAttempt,
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
- TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
- .getStatusEvent();
+ TaskAttemptEventStatusUpdate sEvent = (TaskAttemptEventStatusUpdate) event;
+ TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent();
ta.reportedStatus.state = ta.getState();
ta.reportedStatus.progress = statusEvent.getProgress();
ta.reportedStatus.counters = statusEvent.getCounters();
ta.statistics = statusEvent.getStatistics();
+ if (sEvent.getReadErrorReported()) {
+ // if there is a read error then track the next last data event
+ ta.appendNextDataEvent = true;
+ }
ta.updateProgressSplits();
@@ -1518,7 +1614,7 @@ public class TaskAttemptImpl implements TaskAttempt,
new EventMetaData(EventProducerConsumerType.SYSTEM,
vertex.getName(),
edgeVertex.getName(),
- getID())));
+ getID()), appContext.getClock().getTime()));
}
sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
}
@@ -1595,4 +1691,24 @@ public class TaskAttemptImpl implements TaskAttempt,
public String toString() {
return getID().toString();
}
+
+
+ @Override
+ public void setLastEventSent(TezEvent lastEventSent) {
+ writeLock.lock();
+ try {
+ DataEventDependencyInfo info = new DataEventDependencyInfo(
+ lastEventSent.getEventReceivedTime(), lastEventSent.getSourceInfo().getTaskAttemptID());
+ // task attempt id may be null for input data information events
+ if (appendNextDataEvent) {
+ appendNextDataEvent = false;
+ lastDataEvents.add(info);
+ } else {
+ // over-write last event - array list makes it quick
+ lastDataEvents.set(lastDataEvents.size() - 1, info);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ef8e33a..e6027f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -70,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
@@ -549,7 +550,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID tezTaskAttemptID) {
- TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId());
+ TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null);
return taskAttempt;
}
@@ -814,10 +815,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- TaskAttemptImpl createAttempt(int attemptNumber) {
+ TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
- (failedAttempts > 0), taskResource, containerContext, leafVertex, this);
+ (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
}
@Override
@@ -834,8 +835,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
// This is always called in the Write Lock
- private void addAndScheduleAttempt() {
- TaskAttempt attempt = createAttempt(attempts.size());
+ private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
+ TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
@@ -1048,7 +1049,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event;
task.locationHint = scheduleEvent.getTaskLocationHint();
task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
- task.addAndScheduleAttempt();
+ // For now, initial scheduling dependency is due to vertex manager scheduling
+ task.addAndScheduleAttempt(null);
task.scheduledTime = task.clock.getTime();
task.logJobHistoryTaskStartedEvent();
}
@@ -1066,7 +1068,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public void transition(TaskImpl task, TaskEvent event) {
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
- task.addAndScheduleAttempt();
+ TezTaskAttemptID earliestUnfinishedAttempt = null;
+ for (TaskAttempt ta : task.attempts.values()) {
+ // find the oldest running attempt
+ if (!ta.isFinished()) {
+ earliestUnfinishedAttempt = ta.getID();
+ }
+ }
+ task.addAndScheduleAttempt(earliestUnfinishedAttempt);
}
}
@@ -1143,9 +1152,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// we KillWaitAttemptCompletedTransitionready have a spare
task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
task.getVertex().incrementKilledTaskAttemptCount();
- if (task.getUncompletedAttemptsCount() == 0
- && task.successfulAttempt == null) {
- task.addAndScheduleAttempt();
+ if (task.shouldScheduleNewAttempt()) {
+ task.addAndScheduleAttempt(castEvent.getTaskAttemptID());
}
}
}
@@ -1255,7 +1263,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// If any incomplete, the running attempt will moved to failed and its
// update will trigger a new attempt if possible
if (task.attempts.size() == task.getFinishedAttemptsCount()) {
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(null);
}
endState = TaskStateInternal.RUNNING;
break;
@@ -1304,15 +1312,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return task.getInternalState();
}
}
+
+ private boolean shouldScheduleNewAttempt() {
+ return (getUncompletedAttemptsCount() == 0
+ && successfulAttempt == null);
+ }
private static class AttemptFailedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+ private TezTaskAttemptID schedulingCausalTA;
+
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
task.getVertex().incrementFailedTaskAttemptCount();
TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+ schedulingCausalTA = castEvent.getTaskAttemptID();
task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed,"
+ " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics());
if (task.commitAttempt != null &&
@@ -1327,12 +1343,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
((TaskEventTAUpdate) event).getTaskAttemptID(),
TaskAttemptStateInternal.FAILED);
// we don't need a new event if we already have a spare
- if (task.getUncompletedAttemptsCount() == 0
- && task.successfulAttempt == null) {
+ if (task.shouldScheduleNewAttempt()) {
LOG.info("Scheduling new attempt for task: " + task.getTaskId()
+ ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: "
+ task.maxFailedAttempts);
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(getSchedulingCausalTA());
}
} else {
LOG.info("Failing task: " + task.getTaskId()
@@ -1352,11 +1367,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
protected TaskStateInternal getDefaultState(TaskImpl task) {
return task.getInternalState();
}
+
+ protected TezTaskAttemptID getSchedulingCausalTA() {
+ return schedulingCausalTA;
+ }
}
private static class TaskRetroactiveFailureTransition
extends AttemptFailedTransition {
+ private TezTaskAttemptID schedulingCausalTA;
+
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (task.leafVertex) {
@@ -1386,6 +1407,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// succeeded state
return TaskStateInternal.SUCCEEDED;
}
+
+ Preconditions.checkState(castEvent.getCausalEvent() != null);
+ TaskAttemptEventOutputFailed destinationEvent =
+ (TaskAttemptEventOutputFailed) castEvent.getCausalEvent();
+ schedulingCausalTA = destinationEvent.getInputFailedEvent().getSourceInfo().getTaskAttemptID();
// super.transition is mostly coded for the case where an
// UNcompleted task failed. When a COMPLETED task retroactively
@@ -1402,6 +1428,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return returnState;
}
+
+ @Override
+ protected TezTaskAttemptID getSchedulingCausalTA() {
+ return schedulingCausalTA;
+ }
@Override
protected TaskStateInternal getDefaultState(TaskImpl task) {
@@ -1433,7 +1464,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// from the map splitInfo. So the bad node might be sent as a location
// to the RM. But the RM would ignore that just like it would ignore
// currently pending container requests affinitized to bad nodes.
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(attemptId);
return TaskStateInternal.SCHEDULED;
} else {
// nothing to do
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7cf12e3..95ee298 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3446,6 +3446,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
vertex.distanceFromRoot = distanceFromRoot;
}
vertex.numStartedSourceVertices++;
+ vertex.startTimeRequested = vertex.clock.getTime();
LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
" for vertex: " + vertex.logIdentifier + " numStartedSources: " +
vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
@@ -3504,12 +3505,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
Preconditions.checkState(
(vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()),
"Vertex: " + vertex.logIdentifier + " got invalid start event");
- vertex.startTimeRequested = vertex.clock.getTime();
vertex.startSignalPending = true;
+ vertex.startTimeRequested = vertex.clock.getTime();
}
}
-
+
public static class StartTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -3517,7 +3518,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
public VertexState transition(VertexImpl vertex, VertexEvent event) {
Preconditions.checkState(vertex.getState() == VertexState.INITED,
"Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier);
- vertex.startTimeRequested = vertex.clock.getTime();
+ // if the start signal is pending this event is a fake start event to trigger this transition
+ if (!vertex.startSignalPending) {
+ vertex.startTimeRequested = vertex.clock.getTime();
+ }
return vertex.startVertex();
}
}
@@ -4083,7 +4087,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
int fromEventId, int preRoutedFromEventId, int maxEvents) {
- ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(
+ Task task = getTask(attemptID.getTaskID());
+ ArrayList<TezEvent> events = task.getTaskAttemptTezEvents(
attemptID, preRoutedFromEventId, maxEvents);
int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
int nextFromEventId = fromEventId;
@@ -4169,6 +4174,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
} finally {
onDemandRouteEventsReadLock.unlock();
}
+ if (!events.isEmpty()) {
+ for (int i=(events.size() - 1); i>=0; --i) {
+ TezEvent lastEvent = events.get(i);
+ // record the last event sent by the AM to the task
+ EventType lastEventType = lastEvent.getEventType();
+ // if the following changes then critical path logic/recording may need revision
+ if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT ||
+ lastEventType == EventType.DATA_MOVEMENT_EVENT ||
+ lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+ task.getAttempt(attemptID).setLastEventSent(lastEvent);
+ break;
+ }
+ }
+ }
return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 0cc6666..803159a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -231,11 +231,12 @@ public class VertexManager {
Collection<InputDataInformationEvent> events) {
checkAndThrowIfDone();
verifyIsRootInput(inputName);
+ final long currTime = appContext.getClock().getTime();
Collection<TezEvent> tezEvents = Collections2.transform(events,
new Function<InputDataInformationEvent, TezEvent>() {
@Override
public TezEvent apply(InputDataInformationEvent riEvent) {
- TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata);
+ TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime);
tezEvent.setDestinationInfo(getDestinationMetaData(inputName));
return tezEvent;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index a6b403d..7d6da8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -32,5 +32,6 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
public Container getContainer();
public List<TezTaskAttemptID> getAllTaskAttempts();
public TezTaskAttemptID getCurrentTaskAttempt();
+ public long getCurrentTaskAttemptAllocationTime();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 330f2b7..9b90752 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -95,6 +95,7 @@ public class AMContainerImpl implements AMContainer {
private boolean nodeFailed = false;
private TezTaskAttemptID currentAttempt;
+ private long currentAttemptAllocationTime;
private List<TezTaskAttemptID> failedAssignments;
private boolean inError = false;
@@ -362,6 +363,16 @@ public class AMContainerImpl implements AMContainer {
}
}
+ @Override
+ public long getCurrentTaskAttemptAllocationTime() {
+ readLock.lock();
+ try {
+ return this.currentAttemptAllocationTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
public boolean isInErrorState() {
return inError;
}
@@ -532,6 +543,7 @@ public class AMContainerImpl implements AMContainer {
// Register the additional resources back for this container.
container.containerLocalResources.putAll(container.additionalLocalResources);
container.currentAttempt = event.getTaskAttemptId();
+ container.currentAttemptAllocationTime = container.appContext.getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources);
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index af529bf..fbde635 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -21,16 +21,22 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
public class TaskAttemptFinishedEvent implements HistoryEvent {
@@ -45,14 +51,16 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String diagnostics;
private TezCounters tezCounters;
private TaskAttemptTerminationCause error;
-
+ private List<DataEventDependencyInfo> dataEvents;
+
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
long finishTime,
TaskAttemptState state,
TaskAttemptTerminationCause error,
- String diagnostics, TezCounters counters) {
+ String diagnostics, TezCounters counters,
+ List<DataEventDependencyInfo> dataEvents) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
@@ -61,6 +69,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.diagnostics = diagnostics;
this.tezCounters = counters;
this.error = error;
+ this.dataEvents = dataEvents;
}
public TaskAttemptFinishedEvent() {
@@ -80,7 +89,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public boolean isHistoryEvent() {
return true;
}
-
+
+ public List<DataEventDependencyInfo> getDataEvents() {
+ return dataEvents;
+ }
+
public TaskAttemptFinishedProto toProto() {
TaskAttemptFinishedProto.Builder builder =
TaskAttemptFinishedProto.newBuilder();
@@ -96,6 +109,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
+ if (dataEvents != null && !dataEvents.isEmpty()) {
+ for (DataEventDependencyInfo info : dataEvents) {
+ builder.addDataEvents(DataEventDependencyInfo.toProto(info));
+ }
+ }
return builder.build();
}
@@ -113,6 +131,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
}
+ if (proto.getDataEventsCount() > 0) {
+ this.dataEvents = Lists.newArrayListWithCapacity(proto.getDataEventsCount());
+ for (DataEventDependencyInfoProto protoEvent : proto.getDataEventsList()) {
+ this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent));
+ }
+ }
}
@Override
@@ -140,6 +164,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", status=" + state.name()
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ + ", lastDataEventSourceTA=" +
+ ((dataEvents==null) ? 0:dataEvents.size())
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
.replaceAll("\\n", ", ").replaceAll("\\s+", " "));
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 36add86..4d15fb9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -36,24 +36,30 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
private String inProgressLogsUrl;
private String completedLogsUrl;
private String vertexName;
- private long startTime;
+ private long launchTime;
private ContainerId containerId;
private NodeId nodeId;
private String nodeHttpAddress;
+ private TezTaskAttemptID creationCausalTA;
+ private long creationTime;
+ private long allocationTime;
public TaskAttemptStartedEvent(TezTaskAttemptID taId,
- String vertexName, long startTime,
+ String vertexName, long launchTime,
ContainerId containerId, NodeId nodeId,
String inProgressLogsUrl, String completedLogsUrl,
- String nodeHttpAddress) {
+ String nodeHttpAddress, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
- this.startTime = startTime;
+ this.launchTime = launchTime;
this.containerId = containerId;
this.nodeId = nodeId;
this.inProgressLogsUrl = inProgressLogsUrl;
this.completedLogsUrl = completedLogsUrl;
this.nodeHttpAddress = nodeHttpAddress;
+ this.creationTime = creationTime;
+ this.creationCausalTA = creationCausalTA;
+ this.allocationTime = allocationTime;
}
public TaskAttemptStartedEvent() {
@@ -75,19 +81,29 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
}
public TaskAttemptStartedProto toProto() {
- return TaskAttemptStartedProto.newBuilder()
- .setTaskAttemptId(taskAttemptId.toString())
- .setStartTime(startTime)
+ TaskAttemptStartedProto.Builder builder = TaskAttemptStartedProto.newBuilder();
+ builder.setTaskAttemptId(taskAttemptId.toString())
+ .setStartTime(launchTime)
.setContainerId(containerId.toString())
.setNodeId(nodeId.toString())
- .build();
+ .setCreationTime(creationTime)
+ .setAllocationTime(allocationTime);
+ if (creationCausalTA != null) {
+ builder.setCreationCausalTA(creationCausalTA.toString());
+ }
+ return builder.build();
}
public void fromProto(TaskAttemptStartedProto proto) {
this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
- this.startTime = proto.getStartTime();
+ this.launchTime = proto.getStartTime();
this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
+ this.creationTime = proto.getCreationTime();
+ this.allocationTime = proto.getAllocationTime();
+ if (proto.hasCreationCausalTA()) {
+ this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA());
+ }
}
@Override
@@ -108,7 +124,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
public String toString() {
return "vertexName=" + vertexName
+ ", taskAttemptId=" + taskAttemptId
- + ", startTime=" + startTime
+ + ", creationTime=" + creationTime
+ + ", allocationTime=" + allocationTime
+ + ", startTime=" + launchTime
+ ", containerId=" + containerId
+ ", nodeId=" + nodeId
+ ", inProgressLogs=" + inProgressLogsUrl
@@ -120,7 +138,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
}
public long getStartTime() {
- return startTime;
+ return launchTime;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public long getAllocationTime() {
+ return allocationTime;
+ }
+
+ public TezTaskAttemptID getCreationCausalTA() {
+ return creationCausalTA;
}
public ContainerId getContainerId() {
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
index 0310a26..6f44f33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -145,6 +145,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
if (event.getDestinationInfo() != null) {
evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
}
+ evtBuilder.setEventTime(event.getEventReceivedTime());
tezEventProtos.add(evtBuilder.build());
}
}
@@ -184,7 +185,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
if (eventProto.hasDestinationInfo()) {
destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
}
- TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+ TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
tezEvent.setDestinationInfo(destinationInfo);
this.events.add(tezEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 07ce2f3..411d677 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -530,6 +530,10 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
+ if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+ otherInfo.put(ATSConstants.LAST_DATA_EVENTS,
+ DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents()));
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
@@ -573,6 +577,11 @@ public class HistoryEventJsonConversion {
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+ otherInfo.put(ATSConstants.CREATION_TIME, event.getCreationTime());
+ otherInfo.put(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+ if (event.getCreationCausalTA() != null) {
+ otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT, event.getCreationCausalTA().toString());
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 1447832..76e592e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -42,6 +43,8 @@ import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
+import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.records.TezTaskID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -99,6 +102,27 @@ public class DAGUtils {
}
return dagJson;
}
+
+ public static JSONObject convertDataEventDependencyInfoToJSON(List<DataEventDependencyInfo> info) {
+ return new JSONObject(convertDataEventDependecyInfoToATS(info));
+ }
+
+ public static Map<String, Object> convertDataEventDependecyInfoToATS(List<DataEventDependencyInfo> info) {
+ ArrayList<Object> infoList = new ArrayList<Object>();
+ for (DataEventDependencyInfo event : info) {
+ Map<String, Object> eventObj = new LinkedHashMap<String, Object>();
+ String id = "";
+ if (event.getTaskAttemptId() != null) {
+ id = event.getTaskAttemptId().toString();
+ }
+ eventObj.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), id);
+ eventObj.put(ATSConstants.TIMESTAMP, event.getTimestamp());
+ infoList.add(eventObj);
+ }
+ Map<String,Object> object = new LinkedHashMap<String, Object>();
+ putInto(object, ATSConstants.LAST_DATA_EVENTS, infoList);
+ return object;
+ }
public static JSONObject convertCountersToJSON(TezCounters counters)
throws JSONException {
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 8af48b6..232f1b7 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -164,6 +164,14 @@ message TaskAttemptStartedProto {
optional int64 start_time = 2;
optional string container_id = 3;
optional string node_id = 4;
+ optional int64 creation_time = 5;
+ optional string creation_causal_t_a = 6;
+ optional int64 allocation_time = 7;
+}
+
+message DataEventDependencyInfoProto {
+ optional string task_attempt_id = 1;
+ optional int64 timestamp = 2;
}
message TaskAttemptFinishedProto {
@@ -173,6 +181,7 @@ message TaskAttemptFinishedProto {
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
optional string error_enum = 6;
+ repeated DataEventDependencyInfoProto data_events = 7;
}
message EventMetaDataProto {
@@ -189,6 +198,7 @@ message TezDataMovementEventProto {
optional CompositeEventProto composite_data_movement_event = 4;
optional RootInputDataInformationEventProto root_input_data_information_event = 5;
optional RootInputInitializerEventProto input_initializer_event = 6;
+ optional int64 event_time = 7;
}
message VertexDataMovementEventsGeneratedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 08f6ff6..8fa57d3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -114,7 +114,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
public static interface EventsDelegate {
- public void getEvents(TaskSpec taskSpec, List<TezEvent> events);
+ public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
}
// mock container launcher does not launch real tasks.
@@ -408,7 +408,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
List<TezEvent> events = Lists.newArrayListWithCapacity(
cData.taskSpec.getOutputs().size() + 1);
if (cData.numUpdates == 0 && eventsDelegate != null) {
- eventsDelegate.getEvents(cData.taskSpec, events);
+ eventsDelegate.getEvents(cData.taskSpec, events, getContext().getClock().getTime());
}
TezCounters counters = null;
if (countersDelegate != null) {
@@ -422,7 +422,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
- EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+ EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+ getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
doHeartbeat(request, cData);
@@ -433,7 +434,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
cData.completed = true;
List<TezEvent> events = Collections.singletonList(new TezEvent(
new TaskAttemptCompletedEvent(), new EventMetaData(
- EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+ EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+ getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
doHeartbeat(request, cData);
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 4137d42..42d4b0b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -126,17 +126,17 @@ public class TestMockDAGAppMaster {
static class TestEventsDelegate implements EventsDelegate {
@Override
- public void getEvents(TaskSpec taskSpec, List<TezEvent> events) {
+ public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) {
for (OutputSpec output : taskSpec.getOutputs()) {
if (output.getPhysicalEdgeCount() == 1) {
events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
- .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+ .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
} else {
events.add(new TezEvent(CompositeDataMovementEvent.create(0,
output.getPhysicalEdgeCount(), null), new EventMetaData(
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
- .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+ .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 5c24ecc..641f3a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -58,6 +59,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
@@ -99,12 +101,15 @@ public class TestTaskAttemptListenerImplTezDag {
eventHandler = mock(EventHandler.class);
+ MockClock clock = new MockClock();
+
appContext = mock(AppContext.class);
doReturn(eventHandler).when(appContext).getEventHandler();
doReturn(dag).when(appContext).getCurrentDAG();
doReturn(appAcls).when(appContext).getApplicationACLs();
doReturn(amContainerMap).when(appContext).getAllContainers();
-
+ doReturn(clock).when(appContext).getClock();
+
taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
@@ -198,6 +203,7 @@ public class TestTaskAttemptListenerImplTezDag {
final Event statusUpdateEvent = argAllValues.get(0);
assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
statusUpdateEvent.getType());
+ assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
final Event vertexEvent = argAllValues.get(1);
final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
@@ -207,9 +213,39 @@ public class TestTaskAttemptListenerImplTezDag {
vertexRouteEvent.getEvents().get(0).getEventType());
assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
vertexRouteEvent.getEvents().get(1).getEventType());
+ }
+
+ @Test (timeout = 5000)
+ public void testTaskEventRoutingWithReadError() throws Exception {
+ List<TezEvent> events = Arrays.asList(
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+ new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
+ new TezEvent(new TaskAttemptCompletedEvent(), null)
+ );
+
+ generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ final List<Event> argAllValues = arg.getAllValues();
+
+ final Event statusUpdateEvent = argAllValues.get(0);
+ assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+ statusUpdateEvent.getType());
+ assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
+
+ final Event vertexEvent = argAllValues.get(1);
+ final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+ assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+ vertexEvent.getType());
+ assertEquals(EventType.INPUT_READ_ERROR_EVENT,
+ vertexRouteEvent.getEvents().get(0).getEventType());
+ assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+ vertexRouteEvent.getEvents().get(1).getEventType());
}
+
@Test (timeout = 5000)
public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
List<TezEvent> events = Arrays.asList(
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 5d05fa3..1a1cb11 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -587,6 +588,94 @@ public class TestTaskAttempt {
}
@Test(timeout = 5000)
+ public void testLastDataEventRecording() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+ taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+ locationHint = TaskLocationHint.createTaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+ null));
+ assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+
+ long ts1 = 1024;
+ long ts2 = 2048;
+ TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class);
+ TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class);
+ TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+ when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1);
+ when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1);
+ TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+ when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2);
+ when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2);
+ TaskAttemptEventStatusUpdate statusEvent =
+ new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null));
+
+ assertEquals(0, taImpl.lastDataEvents.size());
+ taImpl.setLastEventSent(mockTezEvent1);
+ assertEquals(1, taImpl.lastDataEvents.size());
+ assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp());
+ assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId());
+ taImpl.handle(statusEvent);
+ taImpl.setLastEventSent(mockTezEvent2);
+ assertEquals(1, taImpl.lastDataEvents.size());
+ assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp());
+ assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value
+ statusEvent.setReadErrorReported(true);
+ taImpl.handle(statusEvent);
+ taImpl.setLastEventSent(mockTezEvent1);
+ assertEquals(2, taImpl.lastDataEvents.size());
+ assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp());
+ assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event
+ taImpl.setLastEventSent(mockTezEvent2);
+ assertEquals(2, taImpl.lastDataEvents.size());
+ assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp());
+ assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value
+ }
+
+ @Test(timeout = 5000)
public void testFailure() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 0665b1e..90274eb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -19,7 +19,6 @@
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -52,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
@@ -66,12 +66,16 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import com.google.common.collect.Lists;
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestTaskAttemptRecovery {
private TaskAttemptImpl ta;
private EventHandler mockEventHandler;
- private long startTime = System.currentTimeMillis();
+ private long creationTime = System.currentTimeMillis();
+ private long allocationTime = creationTime + 5000;
+ private long startTime = allocationTime + 5000;
private long finishTime = startTime + 5000;
private TezTaskAttemptID taId;
@@ -153,9 +157,14 @@ public class TestTaskAttemptRecovery {
}
private void restoreFromTAStartEvent() {
+ TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1);
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- startTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+ startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", creationTime, causalId,
+ allocationTime));
+ assertEquals(causalId, ta.getCreationCausalAttempt());
+ assertEquals(creationTime, ta.getCreationTime());
+ assertEquals(allocationTime, ta.getAllocationTime());
assertEquals(startTime, ta.getLaunchTime());
assertEquals(TaskAttemptState.RUNNING, recoveredState);
}
@@ -169,9 +178,14 @@ public class TestTaskAttemptRecovery {
errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
}
+ long lastDataEventTime = 1024;
+ TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
+ List<DataEventDependencyInfo> events = Lists.newLinkedList();
+ events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
+ events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, errorEnum, diag, counters));
+ startTime, finishTime, state, errorEnum, diag, counters, events));
assertEquals(startTime, ta.getLaunchTime());
assertEquals(finishTime, ta.getFinishTime());
assertEquals(counters, ta.reportedStatus.counters);
@@ -180,6 +194,9 @@ public class TestTaskAttemptRecovery {
assertEquals(1, ta.getDiagnostics().size());
assertEquals(diag, ta.getDiagnostics().get(0));
assertEquals(state, recoveredState);
+ assertEquals(events.size(), ta.lastDataEvents.size());
+ assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp());
+ assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId());
if (state != TaskAttemptState.SUCCEEDED) {
assertEquals(errorEnum, ta.getTerminationCause());
} else {
@@ -304,7 +321,7 @@ public class TestTaskAttemptRecovery {
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
startTime, finishTime, TaskAttemptState.KILLED,
- TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null));
assertEquals(TaskAttemptState.KILLED, recoveredState);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 87dd2fa..1ee6c4e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -72,6 +73,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -359,7 +361,10 @@ public class TestTaskImpl {
LOG.info("--- START: testKillScheduledTaskAttempt ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
+ TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
killScheduledTaskAttempt(mockTask.getLastAttempt().getID());
+ // last killed attempt should be causal TA of next attempt
+ Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
}
@Test(timeout = 5000)
@@ -383,8 +388,11 @@ public class TestTaskImpl {
LOG.info("--- START: testKillRunningTaskAttempt ---");
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
+ TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
launchTaskAttempt(mockTask.getLastAttempt().getID());
killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+ // last killed attempt should be causal TA of next attempt
+ Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
}
/**
@@ -505,11 +513,15 @@ public class TestTaskImpl {
// During the task attempt commit there is an exception which causes
// the attempt to fail
+ TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
+ assertEquals(1, mockTask.getAttemptList().size());
failRunningTaskAttempt(mockTask.getLastAttempt().getID());
assertEquals(2, mockTask.getAttemptList().size());
assertEquals(1, mockTask.failedAttempts);
+ // last failed attempt should be the causal TA
+ Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
assertFalse("First attempt should not commit",
mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
@@ -553,6 +565,7 @@ public class TestTaskImpl {
scheduleTaskAttempt(taskId);
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+ TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
// Add a speculative task attempt that succeeds
mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
@@ -560,6 +573,11 @@ public class TestTaskImpl {
launchTaskAttempt(mockTask.getLastAttempt().getID());
updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // previous running attempt should be the casual TA of this speculative attempt
+ Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
+
assertTrue("Second attempt should commit",
mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
assertFalse("First attempt should not commit",
@@ -602,8 +620,14 @@ public class TestTaskImpl {
eventHandler.events.clear();
// Now fail the attempt after it has succeeded
+ TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+ TezEvent mockTezEvent = mock(TezEvent.class);
+ EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
+ when(mockTezEvent.getSourceInfo()).thenReturn(meta);
+ TaskAttemptEventOutputFailed outputFailedEvent =
+ new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
- .getID(), TaskEventType.T_ATTEMPT_FAILED));
+ .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
// The task should still be in the scheduled state
assertTaskScheduledState();
@@ -611,6 +635,12 @@ public class TestTaskImpl {
Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
event = eventHandler.events.get(eventHandler.events.size()-1);
Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+
+ // report of output read error should be the causal TA
+ List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
+ Assert.assertEquals(2, attempts.size());
+ MockTaskAttemptImpl newAttempt = attempts.get(1);
+ Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
}
@Test(timeout = 5000)
@@ -695,11 +725,11 @@ public class TestTaskImpl {
}
@Override
- protected TaskAttemptImpl createAttempt(int attemptNumber) {
+ protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
attemptNumber, eventHandler, taskAttemptListener,
conf, clock, taskHeartbeatHandler, appContext,
- true, taskResource, containerContext);
+ true, taskResource, containerContext, schedCausalTA);
taskAttempts.add(attempt);
return attempt;
}
@@ -746,9 +776,10 @@ public class TestTaskImpl {
EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean isRescheduled,
- Resource resource, ContainerContext containerContext) {
+ Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
- appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
+ appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
+ schedCausalTA);
}
@Override