You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/08 01:50:52 UTC
tez git commit: TEZ-2778. Improvements to handle multiple read errors
with complex DAGs (bikas)
Repository: tez
Updated Branches:
refs/heads/master db725e568 -> 171d48504
TEZ-2778. Improvements to handle multiple read errors with complex DAGs (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/171d4850
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/171d4850
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/171d4850
Branch: refs/heads/master
Commit: 171d4850443a9e67aa5a68d4c8482779c0caa8eb
Parents: db725e5
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 7 16:50:33 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 7 16:50:33 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 3 +-
.../tez/dag/app/TaskCommunicatorManager.java | 13 +-
.../dag/event/TaskAttemptEventStatusUpdate.java | 9 ++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 72 +++++++--
.../events/TaskAttemptFinishedEvent.java | 43 ++---
.../impl/HistoryEventJsonConversion.java | 6 +-
.../apache/tez/dag/history/utils/DAGUtils.java | 24 +++
tez-dag/src/main/proto/HistoryEvents.proto | 8 +-
.../dag/app/TestTaskCommunicatorManager1.java | 33 ++++
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 87 ++++++++++
.../app/dag/impl/TestTaskAttemptRecovery.java | 15 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 34 ++--
.../TestHistoryEventsProtoConversion.java | 18 ++-
.../impl/TestHistoryEventJsonConversion.java | 2 +-
.../parser/datamodel/TaskAttemptInfo.java | 56 +++++--
.../apache/tez/history/parser/utils/Utils.java | 18 +++
.../apache/tez/history/TestHistoryParser.java | 10 +-
.../ats/HistoryEventTimelineConversion.java | 7 +-
.../ats/TestHistoryEventTimelineConversion.java | 21 ++-
.../analyzer/plugins/CriticalPathAnalyzer.java | 157 +++++++------------
.../org/apache/tez/analyzer/TestAnalyzer.java | 32 ++--
22 files changed, 462 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9df51c0..bce05c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2778. Improvements to handle multiple read errors with complex DAGs
TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
down an AM.
TEZ-2745. ClassNotFoundException of user code should fail dag
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index ad9270f..f786a4e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -84,8 +84,7 @@ public class ATSConstants {
public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
public static final String COMPLETED_LOGS_URL = "completedLogsURL";
public static final String EXIT_STATUS = "exitStatus";
- public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime";
- public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA";
+ public static final String LAST_DATA_EVENTS = "lastDataEvents";
public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index cfb34ac..2cc6ae2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -35,7 +35,6 @@ import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
@@ -234,19 +233,27 @@ public class TaskCommunicatorManager extends AbstractService implements
// to VertexImpl to ensure the events ordering
// 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
// 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+ TaskAttemptEventStatusUpdate taskAttemptEvent = null;
+ boolean readErrorReported = false;
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
// for now, set the event time on the AM when it is received.
// this avoids any time disparity between machines.
tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
- TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+ taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
(TaskStatusUpdateEvent) tezEvent.getEvent());
- context.getEventHandler().handle(taskAttemptEvent);
} else {
+ if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
+ readErrorReported = true;
+ }
otherEvents.add(tezEvent);
}
}
+ if (taskAttemptEvent != null) {
+ taskAttemptEvent.setReadErrorReported(readErrorReported);
+ context.getEventHandler().handle(taskAttemptEvent);
+ }
if(!otherEvents.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
context.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index c5a6ea7..458679c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -24,6 +24,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
private TaskStatusUpdateEvent taskAttemptStatus;
+ private boolean readErrorReported = false;
public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
TaskStatusUpdateEvent statusEvent) {
@@ -34,4 +35,12 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
public TaskStatusUpdateEvent getStatusEvent() {
return this.taskAttemptStatus;
}
+
+ public void setReadErrorReported(boolean value) {
+ readErrorReported = value;
+ }
+
+ public boolean getReadErrorReported() {
+ return readErrorReported;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e57c827..003e05f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -97,6 +97,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -119,6 +120,37 @@ public class TaskAttemptImpl implements TaskAttempt,
private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class);
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
+
+ public static class DataEventDependencyInfo {
+ long timestamp;
+ TezTaskAttemptID taId;
+ public DataEventDependencyInfo(long time, TezTaskAttemptID id) {
+ this.timestamp = time;
+ this.taId = id;
+ }
+ public long getTimestamp() {
+ return timestamp;
+ }
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taId;
+ }
+ public static DataEventDependencyInfoProto toProto(DataEventDependencyInfo info) {
+ DataEventDependencyInfoProto.Builder builder = DataEventDependencyInfoProto.newBuilder();
+ builder.setTimestamp(info.timestamp);
+ if (info.taId != null) {
+ builder.setTaskAttemptId(info.taId.toString());
+ }
+ return builder.build();
+ }
+
+ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto proto) {
+ TezTaskAttemptID taId = null;
+ if(proto.hasTaskAttemptId()) {
+ taId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+ }
+ return new DataEventDependencyInfo(proto.getTimestamp(), taId);
+ }
+ }
static final TezCounters EMPTY_COUNTERS = new TezCounters();
@@ -150,8 +182,8 @@ public class TaskAttemptImpl implements TaskAttempt,
private final Vertex vertex;
@VisibleForTesting
- long lastDataEventTime;
- TezTaskAttemptID lastDataEventSourceTA = null;
+ boolean appendNextDataEvent = true;
+ ArrayList<DataEventDependencyInfo> lastDataEvents = Lists.newArrayList();
@VisibleForTesting
TaskAttemptStatus reportedStatus;
@@ -822,8 +854,9 @@ public class TaskAttemptImpl implements TaskAttempt,
: TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
- this.lastDataEventTime = tEvent.getLastDataEventTime();
- this.lastDataEventSourceTA = tEvent.getLastDataEventSourceTA();
+ if (tEvent.getDataEvents() != null) {
+ this.lastDataEvents.addAll(tEvent.getDataEvents());
+ }
sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
return recoveredState;
}
@@ -1040,7 +1073,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getVertex().getName(), getLaunchTime(),
getFinishTime(), TaskAttemptState.SUCCEEDED, null,
- "", getCounters(), lastDataEventTime, lastDataEventSourceTA);
+ "", getCounters(), lastDataEvents);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1057,8 +1090,7 @@ public class TaskAttemptImpl implements TaskAttempt,
finishTime, state,
terminationCause,
StringUtils.join(
- getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime,
- lastDataEventSourceTA);
+ getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1324,12 +1356,16 @@ public class TaskAttemptImpl implements TaskAttempt,
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
- TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
- .getStatusEvent();
+ TaskAttemptEventStatusUpdate sEvent = (TaskAttemptEventStatusUpdate) event;
+ TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent();
ta.reportedStatus.state = ta.getState();
ta.reportedStatus.progress = statusEvent.getProgress();
ta.reportedStatus.counters = statusEvent.getCounters();
ta.statistics = statusEvent.getStatistics();
+ if (sEvent.getReadErrorReported()) {
+ // if there is a read error then track the next last data event
+ ta.appendNextDataEvent = true;
+ }
ta.updateProgressSplits();
@@ -1655,8 +1691,20 @@ public class TaskAttemptImpl implements TaskAttempt,
@Override
public void setLastEventSent(TezEvent lastEventSent) {
- // task attempt id may be null for input data information events
- this.lastDataEventSourceTA = lastEventSent.getSourceInfo().getTaskAttemptID();
- this.lastDataEventTime = lastEventSent.getEventReceivedTime();
+ writeLock.lock();
+ try {
+ DataEventDependencyInfo info = new DataEventDependencyInfo(
+ lastEventSent.getEventReceivedTime(), lastEventSent.getSourceInfo().getTaskAttemptID());
+ // task attempt id may be null for input data information events
+ if (appendNextDataEvent) {
+ appendNextDataEvent = false;
+ lastDataEvents.add(info);
+ } else {
+ // over-write last event - array list makes it quick
+ lastDataEvents.set(lastDataEvents.size() - 1, info);
+ }
+ } finally {
+ writeLock.unlock();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 52761e2..fbde635 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -21,16 +21,22 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
public class TaskAttemptFinishedEvent implements HistoryEvent {
@@ -45,9 +51,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String diagnostics;
private TezCounters tezCounters;
private TaskAttemptTerminationCause error;
- private TezTaskAttemptID lastDataEventSourceTA;
- private long lastDataEventTime;
-
+ private List<DataEventDependencyInfo> dataEvents;
+
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
@@ -55,8 +60,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
TaskAttemptState state,
TaskAttemptTerminationCause error,
String diagnostics, TezCounters counters,
- long lastDataEventTime,
- TezTaskAttemptID lastDataEventSourceTA) {
+ List<DataEventDependencyInfo> dataEvents) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
@@ -65,8 +69,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.diagnostics = diagnostics;
this.tezCounters = counters;
this.error = error;
- this.lastDataEventTime = lastDataEventTime;
- this.lastDataEventSourceTA = lastDataEventSourceTA;
+ this.dataEvents = dataEvents;
}
public TaskAttemptFinishedEvent() {
@@ -87,14 +90,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
return true;
}
- public long getLastDataEventTime() {
- return lastDataEventTime;
+ public List<DataEventDependencyInfo> getDataEvents() {
+ return dataEvents;
}
- public TezTaskAttemptID getLastDataEventSourceTA() {
- return lastDataEventSourceTA;
- }
-
public TaskAttemptFinishedProto toProto() {
TaskAttemptFinishedProto.Builder builder =
TaskAttemptFinishedProto.newBuilder();
@@ -110,9 +109,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
- if (lastDataEventSourceTA != null) {
- builder.setLastDataEventSourceTA(lastDataEventSourceTA.toString());
- builder.setLastDataEventTime(lastDataEventTime);
+ if (dataEvents != null && !dataEvents.isEmpty()) {
+ for (DataEventDependencyInfo info : dataEvents) {
+ builder.addDataEvents(DataEventDependencyInfo.toProto(info));
+ }
}
return builder.build();
}
@@ -131,9 +131,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
}
- if (proto.hasLastDataEventSourceTA()) {
- this.lastDataEventSourceTA = TezTaskAttemptID.fromString(proto.getLastDataEventSourceTA());
- this.lastDataEventTime = proto.getLastDataEventTime();
+ if (proto.getDataEventsCount() > 0) {
+ this.dataEvents = Lists.newArrayListWithCapacity(proto.getDataEventsCount());
+ for (DataEventDependencyInfoProto protoEvent : proto.getDataEventsList()) {
+ this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent));
+ }
}
}
@@ -163,8 +165,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ ", lastDataEventSourceTA=" +
- ((lastDataEventSourceTA==null) ? null:lastDataEventSourceTA.toString())
- + ", lastDataEventTime=" + lastDataEventTime
+ ((dataEvents==null) ? 0:dataEvents.size())
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
.replaceAll("\\n", ", ").replaceAll("\\s+", " "));
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index b32b324..411d677 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -530,9 +530,9 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
- otherInfo.put(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
- if (event.getLastDataEventSourceTA() != null) {
- otherInfo.put(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, event.getLastDataEventSourceTA().toString());
+ if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+ otherInfo.put(ATSConstants.LAST_DATA_EVENTS,
+ DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents()));
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 1447832..76e592e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -42,6 +43,8 @@ import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
+import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.records.TezTaskID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -99,6 +102,27 @@ public class DAGUtils {
}
return dagJson;
}
+
+ public static JSONObject convertDataEventDependencyInfoToJSON(List<DataEventDependencyInfo> info) {
+ return new JSONObject(convertDataEventDependecyInfoToATS(info));
+ }
+
+ public static Map<String, Object> convertDataEventDependecyInfoToATS(List<DataEventDependencyInfo> info) {
+ ArrayList<Object> infoList = new ArrayList<Object>();
+ for (DataEventDependencyInfo event : info) {
+ Map<String, Object> eventObj = new LinkedHashMap<String, Object>();
+ String id = "";
+ if (event.getTaskAttemptId() != null) {
+ id = event.getTaskAttemptId().toString();
+ }
+ eventObj.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), id);
+ eventObj.put(ATSConstants.TIMESTAMP, event.getTimestamp());
+ infoList.add(eventObj);
+ }
+ Map<String,Object> object = new LinkedHashMap<String, Object>();
+ putInto(object, ATSConstants.LAST_DATA_EVENTS, infoList);
+ return object;
+ }
public static JSONObject convertCountersToJSON(TezCounters counters)
throws JSONException {
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e268e0d..232f1b7 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -169,6 +169,11 @@ message TaskAttemptStartedProto {
optional int64 allocation_time = 7;
}
+message DataEventDependencyInfoProto {
+ optional string task_attempt_id = 1;
+ optional int64 timestamp = 2;
+}
+
message TaskAttemptFinishedProto {
optional string task_attempt_id = 1;
optional int64 finish_time = 2;
@@ -176,8 +181,7 @@ message TaskAttemptFinishedProto {
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
optional string error_enum = 6;
- optional int64 last_data_event_time = 7;
- optional string last_data_event_source_t_a = 8;
+ repeated DataEventDependencyInfoProto data_events = 7;
}
message EventMetaDataProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 117d3b3..35893b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -68,6 +68,7 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -79,6 +80,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
@@ -247,6 +249,7 @@ public class TestTaskCommunicatorManager1 {
final Event statusUpdateEvent = argAllValues.get(0);
assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
statusUpdateEvent.getType());
+ assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
final Event vertexEvent = argAllValues.get(1);
final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
@@ -256,9 +259,39 @@ public class TestTaskCommunicatorManager1 {
vertexRouteEvent.getEvents().get(0).getEventType());
assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
vertexRouteEvent.getEvents().get(1).getEventType());
+ }
+
+ @Test (timeout = 5000)
+ public void testTaskEventRoutingWithReadError() throws Exception {
+ List<TezEvent> events = Arrays.asList(
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+ new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
+ new TezEvent(new TaskAttemptCompletedEvent(), null)
+ );
+
+ generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ final List<Event> argAllValues = arg.getAllValues();
+
+ final Event statusUpdateEvent = argAllValues.get(0);
+ assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+ statusUpdateEvent.getType());
+ assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
+
+ final Event vertexEvent = argAllValues.get(1);
+ final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+ assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+ vertexEvent.getType());
+ assertEquals(EventType.INPUT_READ_ERROR_EVENT,
+ vertexRouteEvent.getEvents().get(0).getEventType());
+ assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+ vertexRouteEvent.getEvents().get(1).getEventType());
}
+
@Test (timeout = 5000)
public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
List<TezEvent> events = Arrays.asList(
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 101b22f..2d30a6f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -584,6 +585,92 @@ public class TestTaskAttempt {
}
@Test(timeout = 5000)
+ public void testLastDataEventRecording() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+ taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+ locationHint = TaskLocationHint.createTaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+ new ContainerContextMatcher(), appCtx);
+ containers.addContainerIfNew(container, 0, 0, 0);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+ null));
+ assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+
+ long ts1 = 1024;
+ long ts2 = 2048;
+ TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class);
+ TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class);
+ TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+ when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1);
+ when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1);
+ TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+ when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2);
+ when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2);
+ TaskAttemptEventStatusUpdate statusEvent =
+ new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null));
+
+ assertEquals(0, taImpl.lastDataEvents.size());
+ taImpl.setLastEventSent(mockTezEvent1);
+ assertEquals(1, taImpl.lastDataEvents.size());
+ assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp());
+ assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId());
+ taImpl.handle(statusEvent);
+ taImpl.setLastEventSent(mockTezEvent2);
+ assertEquals(1, taImpl.lastDataEvents.size());
+ assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp());
+ assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value
+ statusEvent.setReadErrorReported(true);
+ taImpl.handle(statusEvent);
+ taImpl.setLastEventSent(mockTezEvent1);
+ assertEquals(2, taImpl.lastDataEvents.size());
+ assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp());
+ assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event
+ taImpl.setLastEventSent(mockTezEvent2);
+ assertEquals(2, taImpl.lastDataEvents.size());
+ assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp());
+ assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value
+ }
+
+ @Test(timeout = 5000)
public void testFailure() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 6bbfc3d..53f1856 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -51,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
@@ -65,6 +66,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import com.google.common.collect.Lists;
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestTaskAttemptRecovery {
@@ -177,9 +180,12 @@ public class TestTaskAttemptRecovery {
long lastDataEventTime = 1024;
TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
+ List<DataEventDependencyInfo> events = Lists.newLinkedList();
+ events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
+ events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, errorEnum, diag, counters, lastDataEventTime, lastDataEventTA));
+ startTime, finishTime, state, errorEnum, diag, counters, events));
assertEquals(startTime, ta.getLaunchTime());
assertEquals(finishTime, ta.getFinishTime());
assertEquals(counters, ta.reportedStatus.counters);
@@ -188,8 +194,9 @@ public class TestTaskAttemptRecovery {
assertEquals(1, ta.getDiagnostics().size());
assertEquals(diag, ta.getDiagnostics().get(0));
assertEquals(state, recoveredState);
- assertEquals(lastDataEventTime, ta.lastDataEventTime);
- assertEquals(lastDataEventTA, ta.lastDataEventSourceTA);
+ assertEquals(events.size(), ta.lastDataEvents.size());
+ assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp());
+ assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId());
if (state != TaskAttemptState.SUCCEEDED) {
assertEquals(errorEnum, ta.getTerminationCause());
} else {
@@ -314,7 +321,7 @@ public class TestTaskAttemptRecovery {
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
startTime, finishTime, TaskAttemptState.KILLED,
- TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), 0, null));
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null));
assertEquals(TaskAttemptState.KILLED, recoveredState);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index eca8274..b6d4c10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -286,7 +286,7 @@ public class TestTaskRecovery {
restoreFromTaskStartEvent();
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), 0, null));
+ 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null));
task.handle(new TaskEventRecoverTask(task.getTaskId()));
// wait for the second task attempt is scheduled
dispatcher.await();
@@ -307,7 +307,7 @@ public class TestTaskRecovery {
restoreFromTaskStartEvent();
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), 0, null));
+ 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null));
task.handle(new TaskEventRecoverTask(task.getTaskId()));
// wait for the second task attempt is scheduled
dispatcher.await();
@@ -329,7 +329,7 @@ public class TestTaskRecovery {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
try {
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), 0, null));
+ 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null));
fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
} catch (TezUncheckedException e) {
assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
@@ -372,7 +372,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -405,7 +405,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -438,7 +438,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -473,7 +473,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -516,7 +516,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -528,7 +528,7 @@ public class TestTaskRecovery {
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -563,7 +563,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -575,7 +575,7 @@ public class TestTaskRecovery {
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -614,7 +614,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -654,7 +654,7 @@ public class TestTaskRecovery {
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -735,7 +735,7 @@ public class TestTaskRecovery {
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters(), 0, null));
+ "", new TezCounters(), null));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(TaskAttemptStateInternal.NEW,
((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -776,7 +776,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.KILLED, null, "", null, 0, null));
+ 0, TaskAttemptState.KILLED, null, "", null, null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(0, task.failedAttempts);
@@ -806,7 +806,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, null, "", null, 0, null));
+ 0, TaskAttemptState.FAILED, null, "", null, null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -836,7 +836,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, null, "", null, 0, null));
+ 0, TaskAttemptState.FAILED, null, "", null, null));
}
assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
assertEquals(maxFailedAttempts - 1, task.failedAttempts);
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b215a06..5c8c90e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -508,9 +509,7 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- null, null, null, 1024,
- TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 110), 1), 1));
+ null, null, null, null);
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -523,16 +522,20 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getState());
Assert.assertEquals(event.getCounters(),
deserializedEvent.getCounters());
- Assert.assertEquals(event.getLastDataEventTime(), deserializedEvent.getLastDataEventTime());
- Assert.assertEquals(event.getLastDataEventSourceTA(), deserializedEvent.getLastDataEventSourceTA());
logEvents(event, deserializedEvent);
}
{
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0);
+ long timestamp = 1024L;
+ List<DataEventDependencyInfo> events = Lists.newArrayList();
+ events.add(new DataEventDependencyInfo(timestamp, taId));
+ events.add(new DataEventDependencyInfo(timestamp, taId));
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), 0, null);
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events);
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -547,6 +550,9 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getCounters());
Assert.assertEquals(event.getTaskAttemptError(),
deserializedEvent.getTaskAttemptError());
+ Assert.assertEquals(events.size(), event.getDataEvents().size());
+ Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp());
+ Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId());
logEvents(event, deserializedEvent);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 9c11dc7..711e4bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, 0, null);
+ random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index ccec0db..ca008ce 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -20,6 +20,7 @@ package org.apache.tez.history.parser.datamodel;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@@ -28,10 +29,13 @@ import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.history.parser.utils.Utils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import static org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -53,14 +57,29 @@ public class TaskAttemptInfo extends BaseInfo {
private final String status;
private final String logUrl;
private final String creationCausalTA;
- private final long lastDataEventTime;
- private final String lastDataEventSourceTA;
private final String terminationCause;
private final long executionTimeInterval;
+ // this list is in time order - array list for easy walking
+ private final ArrayList<DataDependencyEvent> lastDataEvents = Lists.newArrayList();
private TaskInfo taskInfo;
private Container container;
+
+ public static class DataDependencyEvent {
+ String taId;
+ long timestamp;
+ public DataDependencyEvent(String id, long time) {
+ taId = id;
+ timestamp = time;
+ }
+ public long getTimestamp() {
+ return timestamp;
+ }
+ public String getTaskAttemptId() {
+ return taId;
+ }
+ }
TaskAttemptInfo(JSONObject jsonObject) throws JSONException {
super(jsonObject);
@@ -87,9 +106,17 @@ public class TaskAttemptInfo extends BaseInfo {
status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
container = new Container(containerId, nodeId);
- lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME);
- lastDataEventSourceTA = StringInterner.weakIntern(
- otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
+ if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) {
+ List<DataDependencyEvent> eventInfo = Utils.parseDataEventDependencyFromJSON(
+ otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS));
+ long lastTime = 0;
+ for (DataDependencyEvent item : eventInfo) {
+ // check these are in time order
+ Preconditions.checkState(lastTime < item.getTimestamp());
+ lastTime = item.getTimestamp();
+ lastDataEvents.add(item);
+ }
+ }
terminationCause = StringInterner
.weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
@@ -121,6 +148,10 @@ public class TaskAttemptInfo extends BaseInfo {
return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
}
+ public final List<DataDependencyEvent> getLastDataEvents() {
+ return lastDataEvents;
+ }
+
public final long getExecutionTimeInterval() {
return executionTimeInterval;
}
@@ -149,14 +180,17 @@ public class TaskAttemptInfo extends BaseInfo {
return creationTime;
}
- public final long getLastDataEventTime() {
- return lastDataEventTime;
+ public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) {
+ for (int i=lastDataEvents.size()-1; i>=0; i--) {
+ // walk back in time until we get first event that happened before the threshold
+ DataDependencyEvent item = lastDataEvents.get(i);
+ if (item.getTimestamp() < timeThreshold) {
+ return item;
+ }
+ }
+ return null;
}
- public final String getLastDataEventSourceTA() {
- return lastDataEventSourceTA;
- }
-
public final long getTimeTaken() {
return getFinishTimeInterval() - getStartTimeInterval();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
index 7345012..ffb854a 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -19,7 +19,10 @@
package org.apache.tez.history.parser.utils;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringInterner;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -27,8 +30,10 @@ import org.apache.log4j.PatternLayout;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.history.parser.datamodel.Constants;
import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -83,6 +88,19 @@ public class Utils {
}
return counters;
}
+
+ public static List<DataDependencyEvent> parseDataEventDependencyFromJSON(JSONObject jsonObject)
+ throws JSONException {
+ List<DataDependencyEvent> events = Lists.newArrayList();
+ JSONArray fields = jsonObject.optJSONArray(Constants.LAST_DATA_EVENTS);
+ for (int i=0; i<fields.length(); i++) {
+ JSONObject eventMap = fields.getJSONObject(i);
+ events.add(new DataDependencyEvent(
+ StringInterner.weakIntern(eventMap.optString(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())),
+ eventMap.optLong(Constants.TIMESTAMP)));
+ }
+ return events;
+ }
/**
* Parse events from json
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index 2b797a5..8dbfdc9 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -63,6 +63,7 @@ import org.apache.tez.history.parser.datamodel.BaseInfo;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.EdgeInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VersionInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
@@ -77,7 +78,6 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.tests.MiniTezClusterWithTimeline;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -288,12 +288,14 @@ public class TestHistoryParser {
}
} else {
for (TaskAttemptInfo attempt : attempts) {
- assertTrue(attempt.getLastDataEventTime() > 0);
+ DataDependencyEvent item = attempt.getLastDataEvents().get(0);
+ assertTrue(item.getTimestamp() > 0);
+
if (lastDataEventSourceTA == null) {
- lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+ lastDataEventSourceTA = item.getTaskAttemptId();
} else {
// all attempts should have the same last data event source TA
- assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+ assertTrue(lastDataEventSourceTA.equals(item.getTaskAttemptId()));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 1b7e183..4685a61 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -435,10 +435,9 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
atsEntity.addOtherInfo(ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getCounters()));
- atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
- if (event.getLastDataEventSourceTA() != null) {
- atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_SOURCE_TA,
- event.getLastDataEventSourceTA().toString());
+ if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+ atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS,
+ DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
}
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 8db32b0..2849c10 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.history.logging.ats;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -45,6 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.app.web.AMWebController;
import org.apache.tez.dag.history.HistoryEvent;
@@ -85,6 +87,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestHistoryEventTimelineConversion {
private ApplicationAttemptId applicationAttemptId;
@@ -170,7 +174,7 @@ public class TestHistoryEventTimelineConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, 0, null);
+ random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -445,6 +449,7 @@ public class TestHistoryEventTimelineConversion {
timelineEntity.getOtherInfo().get(ATSConstants.USER));
}
+ @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertTaskAttemptFinishedEvent() {
String vertexName = "testVertex";
@@ -457,9 +462,12 @@ public class TestHistoryEventTimelineConversion {
String diagnostics = "random diagnostics message";
TezCounters counters = new TezCounters();
long lastDataEventTime = finishTime - 1;
+ List<DataEventDependencyInfo> events = Lists.newArrayList();
+ events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
+ events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
- startTime, finishTime, state, error, diagnostics, counters, lastDataEventTime, tezTaskAttemptID);
+ startTime, finishTime, state, error, diagnostics, counters, events);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -482,14 +490,17 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(finishTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
- Assert.assertEquals(8, otherInfo.size());
+ Assert.assertEquals(7, otherInfo.size());
Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
- Assert.assertEquals(lastDataEventTime, otherInfo.get(ATSConstants.LAST_DATA_EVENT_TIME));
- Assert.assertEquals(tezTaskAttemptID.toString(), otherInfo.get(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
+ Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS);
+ List<Object> obj2 = (List<Object>) obj1.get(ATSConstants.LAST_DATA_EVENTS);
+ Assert.assertEquals(2, obj2.size());
+ Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0);
+ Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP));
Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index c8d4225..350f783 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -33,11 +33,11 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
import org.apache.tez.analyzer.utils.SVGUtils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.history.parser.datamodel.Container;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -165,8 +165,10 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
.getAvgExecutionTimeInterval();
if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
step.notes
- .add("Potential straggler. Execution time " + attempt.getExecutionTimeInterval()
- + " compared to vertex average of " + avgExecutionTime);
+ .add("Potential straggler. Execution time " +
+ SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
+ + " compared to vertex average of " +
+ SVGUtils.getTimeStr(avgExecutionTime));
}
if (attempt.getStartTime() > step.startCriticalPathTime) {
@@ -231,14 +233,44 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
System.out.println(
"Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
+
currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+
+ // consider the last data event seen immediately preceding the current critical path
+ // stop time for this attempt
+ long currentStepLastDataEventTime = 0;
+ String currentStepLastDataTA = null;
+ DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime);
+ if (item!=null) {
+ currentStepLastDataEventTime = item.getTimestamp();
+ currentStepLastDataTA = item.getTaskAttemptId();
+ }
+
+ // sanity check
+ for (CriticalPathStep previousStep : tempCP) {
+ if (previousStep.type == EntityType.ATTEMPT) {
+ if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) {
+ // found loop.
+ // this should only happen for read errors in currentAttempt
+ List<DataDependencyEvent> dataEvents = currentAttempt.getLastDataEvents();
+ Preconditions.checkState(dataEvents.size() > 1); // received
+ // original and
+ // retry data events
+ Preconditions.checkState(currentStepLastDataEventTime < dataEvents
+ .get(dataEvents.size() - 1).getTimestamp()); // new event is
+ // earlier than
+ // last
+ }
+ }
+ }
+
tempCP.add(currentStep);
// find the next attempt on the critical path
boolean dataDependency = false;
// find out predecessor dependency
- if (currentAttempt.getLastDataEventTime() > currentAttempt.getCreationTime()) {
+ if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) {
dataDependency = true;
}
@@ -248,13 +280,13 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
if (dataDependency) {
// last data event was produced after the attempt was scheduled. use
// data dependency
- // typically case when scheduling ahead of time
+ // typically the case when scheduling ahead of time
System.out.println("Has data dependency");
- if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+ if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
// there is a valid data causal TA. Use it.
- nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+ nextAttemptId = currentStepLastDataTA;
reason = CriticalPathDependency.DATA_DEPENDENCY;
- startCriticalPathTime = currentAttempt.getLastDataEventTime();
+ startCriticalPathTime = currentStepLastDataEventTime;
System.out.println("Using data dependency " + nextAttemptId);
} else {
// there is no valid data causal TA. This means data event came from the same vertex
@@ -289,20 +321,30 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
}
}
}
- startCriticalPathTime = currentAttempt.getCreationTime();
+ if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
+ // rescheduled due to read error. start critical at read error report time.
+ // for now proxy own creation time for read error report time
+ startCriticalPathTime = currentAttempt.getCreationTime();
+ } else {
+ // rescheduled due to own previous attempt failure
+ // we are critical when the previous attempt fails
+ Preconditions.checkState(nextAttempt != null);
+ Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals(
+ currentAttempt.getTaskInfo().getTaskId()));
+ startCriticalPathTime = nextAttempt.getFinishTime();
+ }
System.out.println("Using scheduling dependency " + nextAttemptId);
} else {
// there is no scheduling causal TA.
- if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+ if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
// there is a data event going to the vertex. Count the time between data event and
- // scheduling time as Initializer/Manager overhead and follow data dependency
- nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+ // creation time as Initializer/Manager overhead and follow data dependency
+ nextAttemptId = currentStepLastDataTA;
reason = CriticalPathDependency.DATA_DEPENDENCY;
- startCriticalPathTime = currentAttempt.getLastDataEventTime();
- long overhead = currentAttempt.getCreationTime()
- - currentAttempt.getLastDataEventTime();
+ startCriticalPathTime = currentStepLastDataEventTime;
+ long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime;
currentStep.notes
- .add("Initializer/VertexManager scheduling overhead " + overhead + " ms");
+ .add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead));
System.out.println("Using data dependency " + nextAttemptId);
} else {
// there is no scheduling causal TA and no data event casual TA.
@@ -316,89 +358,6 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
}
}
-
- if (!Strings.isNullOrEmpty(nextAttemptId)) {
- TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
- TaskAttemptInfo attemptToCheck = nextAttempt;
-
- // check if the next attempt is already on critical path to prevent infinite loop
- boolean foundLoop = false;
- CriticalPathDependency prevReason = null;
- for (CriticalPathStep previousStep : tempCP) {
- if (previousStep.attempt.equals(attemptToCheck)) {
- foundLoop = true;
- prevReason = previousStep.reason;
- }
- }
-
- if (foundLoop) {
- // found a loop - find the next step based on heuristics
- /* only the losing outputs causes us to backtrack. There are 2 cases
- * 1) Step N reported last data event to this step
- * -> Step N+1 (current step) is the retry for read error reported
- * -> read error was reported by the Step N attempt and it did not exit after the
- * error
- * -> So scheduling dependency of Step N points back to step N+1
- * 2) Step N reported last data event to this step
- * -> Step N+1 is a retry for a read error reported
- * -> Step N+2 is the attempt that reported the read error
- * -> Step N+3 is the last data event of N+2 and points back to N+1
- */
- System.out.println("Reset " + currentAttempt.getTaskAttemptId()
- + " cause: " + currentAttempt.getTerminationCause()
- + " time: " + currentAttempt.getFinishTime()
- + " reason: " + reason
- + " because of: " + attemptToCheck.getTaskAttemptId());
- TaskAttemptInfo attemptWithLostAncestor = currentAttempt;
- if (reason != CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
- // Case 2 above. If reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY
- // then its Case 1 above
- Preconditions.checkState(prevReason.equals(
- CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), prevReason);
- reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
- attemptWithLostAncestor = nextAttempt;
- }
- System.out.println("Reset " + currentAttempt.getTaskAttemptId()
- + " cause: " + currentAttempt.getTerminationCause()
- + " time: " + currentAttempt.getFinishTime()
- + " reason: " + reason
- + " because of: " + attemptToCheck.getTaskAttemptId()
- + " looking at: " + attemptWithLostAncestor.getTaskAttemptId());
- Preconditions.checkState(reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY);
- // we dont track all input events to the consumer. So just jump to
- // the previous successful version of the current attempt
- TaskAttemptInfo prevSuccAttempt = null;
- for (TaskAttemptInfo prevAttempt : attemptWithLostAncestor.getTaskInfo().getTaskAttempts()) {
- System.out.println("Looking at " + prevAttempt.getTaskAttemptId()
- + " cause: " + prevAttempt.getTerminationCause() +
- " time: " + prevAttempt.getFinishTime());
- if (prevAttempt.getTerminationCause()
- .equals(TaskAttemptTerminationCause.OUTPUT_LOST.name())) {
- if (prevAttempt.getFinishTime() < currentAttempt.getFinishTime()) {
- // attempt finished before current attempt
- if (prevSuccAttempt == null
- || prevAttempt.getFinishTime() > prevSuccAttempt.getFinishTime()) {
- // keep the latest attempt that had lost outputs
- prevSuccAttempt = prevAttempt;
- }
- }
- }
- }
- Preconditions.checkState(prevSuccAttempt != null,
- attemptWithLostAncestor.getTaskAttemptId());
- System.out
- .println("Resetting nextAttempt to : " + prevSuccAttempt.getTaskAttemptId()
- + " from " + nextAttempt.getTaskAttemptId());
- nextAttemptId = prevSuccAttempt.getTaskAttemptId();
- if (attemptWithLostAncestor == currentAttempt) {
- startCriticalPathTime = currentAttempt.getCreationTime();
- } else {
- startCriticalPathTime = prevSuccAttempt.getFinishTime();
- }
- }
-
- }
-
currentStep.startCriticalPathTime = startCriticalPathTime;
currentStep.reason = reason;
http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index 9a75461..f3a69a6 100644
--- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -182,7 +182,7 @@ public class TestAnalyzer {
DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception {
tezSession.waitTillReady();
numDAGs++;
- LOG.info("XXX Running DAG name: " + dag.getName());
+ LOG.info("ABC Running DAG name: " + dag.getName());
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
@@ -230,13 +230,13 @@ public class TestAnalyzer {
List<CriticalPathStep> criticalPath = cp.getCriticalPath();
for (CriticalPathStep step : criticalPath) {
- LOG.info("XXX Step: " + step.getType());
+ LOG.info("ABC Step: " + step.getType());
if (step.getType() == EntityType.ATTEMPT) {
- LOG.info("XXX Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
+ LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
}
- LOG.info("XXX Reason: " + step.getReason());
+ LOG.info("ABC Reason: " + step.getReason());
String notes = Joiner.on(";").join(step.getNotes());
- LOG.info("XXX Notes: " + notes);
+ LOG.info("ABC Notes: " + notes);
}
boolean foundMatchingLength = false;
@@ -361,6 +361,7 @@ public class TestAnalyzer {
StepCheck[] check = {
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
};
@@ -416,7 +417,9 @@ public class TestAnalyzer {
StepCheck[] check = {
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
};
@@ -484,6 +487,9 @@ public class TestAnalyzer {
StepCheck[] check = {
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+ createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
@@ -553,7 +559,13 @@ public class TestAnalyzer {
StepCheck[] check = {
// use regex for either vertices being possible on the path
createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
- createStep("v[12] : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v[12] : 000000_[01]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v[12] : 000000_[012]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+ createStep("v[12] : 000000_[12]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
};
@@ -643,17 +655,11 @@ public class TestAnalyzer {
StepCheck[] check1 = {
// use regex for either vertices being possible on the path
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
- createStep("v[23] : 000000_0", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+ createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
};
- StepCheck[] check2 = {
- createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
- createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
- createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
- };
stepsOptions.add(check1);
- stepsOptions.add(check2);
DAG dag = SimpleReverseVTestDAG.createDAG(
"testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions);