You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/08 01:50:52 UTC

tez git commit: TEZ-2778. Improvements to handle multiple read errors with complex DAGs (bikas)

Repository: tez
Updated Branches:
  refs/heads/master db725e568 -> 171d48504


TEZ-2778. Improvements to handle multiple read errors with complex DAGs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/171d4850
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/171d4850
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/171d4850

Branch: refs/heads/master
Commit: 171d4850443a9e67aa5a68d4c8482779c0caa8eb
Parents: db725e5
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 7 16:50:33 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 7 16:50:33 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/ATSConstants.java     |   3 +-
 .../tez/dag/app/TaskCommunicatorManager.java    |  13 +-
 .../dag/event/TaskAttemptEventStatusUpdate.java |   9 ++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  72 +++++++--
 .../events/TaskAttemptFinishedEvent.java        |  43 ++---
 .../impl/HistoryEventJsonConversion.java        |   6 +-
 .../apache/tez/dag/history/utils/DAGUtils.java  |  24 +++
 tez-dag/src/main/proto/HistoryEvents.proto      |   8 +-
 .../dag/app/TestTaskCommunicatorManager1.java   |  33 ++++
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  87 ++++++++++
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  15 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |  34 ++--
 .../TestHistoryEventsProtoConversion.java       |  18 ++-
 .../impl/TestHistoryEventJsonConversion.java    |   2 +-
 .../parser/datamodel/TaskAttemptInfo.java       |  56 +++++--
 .../apache/tez/history/parser/utils/Utils.java  |  18 +++
 .../apache/tez/history/TestHistoryParser.java   |  10 +-
 .../ats/HistoryEventTimelineConversion.java     |   7 +-
 .../ats/TestHistoryEventTimelineConversion.java |  21 ++-
 .../analyzer/plugins/CriticalPathAnalyzer.java  | 157 +++++++------------
 .../org/apache/tez/analyzer/TestAnalyzer.java   |  32 ++--
 22 files changed, 462 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9df51c0..bce05c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2778. Improvements to handle multiple read errors with complex DAGs
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
   down an AM.
   TEZ-2745. ClassNotFoundException of user code should fail dag

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index ad9270f..f786a4e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -84,8 +84,7 @@ public class ATSConstants {
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";
   public static final String EXIT_STATUS = "exitStatus";
-  public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime";
-  public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA";
+  public static final String LAST_DATA_EVENTS = "lastDataEvents";
   public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
   public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
 

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index cfb34ac..2cc6ae2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -35,7 +35,6 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
@@ -234,19 +233,27 @@ public class TaskCommunicatorManager extends AbstractService implements
       // to VertexImpl to ensure the events ordering
       //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
       //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+      TaskAttemptEventStatusUpdate taskAttemptEvent = null;
+      boolean readErrorReported = false;
       for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
         // for now, set the event time on the AM when it is received.
         // this avoids any time disparity between machines.
         tezEvent.setEventReceivedTime(currTime);
         final EventType eventType = tezEvent.getEventType();
         if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
-          TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+          taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
               (TaskStatusUpdateEvent) tezEvent.getEvent());
-          context.getEventHandler().handle(taskAttemptEvent);
         } else {
+          if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
+            readErrorReported = true;
+          }
           otherEvents.add(tezEvent);
         }
       }
+      if (taskAttemptEvent != null) {
+        taskAttemptEvent.setReadErrorReported(readErrorReported);
+        context.getEventHandler().handle(taskAttemptEvent);
+      }
       if(!otherEvents.isEmpty()) {
         TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
         context.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index c5a6ea7..458679c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -24,6 +24,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
   
   private TaskStatusUpdateEvent taskAttemptStatus;
+  private boolean readErrorReported = false;
   
   public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
       TaskStatusUpdateEvent statusEvent) {
@@ -34,4 +35,12 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
   public TaskStatusUpdateEvent getStatusEvent() {
     return this.taskAttemptStatus;
   }
+  
+  public void setReadErrorReported(boolean value) {
+    readErrorReported = value;
+  }
+  
+  public boolean getReadErrorReported() {
+    return readErrorReported;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e57c827..003e05f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -97,6 +97,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -119,6 +120,37 @@ public class TaskAttemptImpl implements TaskAttempt,
   private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class);
   private static final String LINE_SEPARATOR = System
       .getProperty("line.separator");
+  
+  public static class DataEventDependencyInfo {
+    long timestamp;
+    TezTaskAttemptID taId;
+    public DataEventDependencyInfo(long time, TezTaskAttemptID id) {
+      this.timestamp = time;
+      this.taId = id;
+    }
+    public long getTimestamp() {
+      return timestamp;
+    }
+    public TezTaskAttemptID getTaskAttemptId() {
+      return taId;
+    }
+    public static DataEventDependencyInfoProto toProto(DataEventDependencyInfo info) {
+      DataEventDependencyInfoProto.Builder builder = DataEventDependencyInfoProto.newBuilder();
+      builder.setTimestamp(info.timestamp);
+      if (info.taId != null) {
+        builder.setTaskAttemptId(info.taId.toString());
+      }
+      return builder.build();
+    }
+    
+    public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto proto) {
+      TezTaskAttemptID taId = null;
+      if(proto.hasTaskAttemptId()) {
+        taId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+      }
+      return new DataEventDependencyInfo(proto.getTimestamp(), taId);
+    }
+  }
 
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
 
@@ -150,8 +182,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Vertex vertex;
 
   @VisibleForTesting
-  long lastDataEventTime;
-  TezTaskAttemptID lastDataEventSourceTA = null;
+  boolean appendNextDataEvent = true;
+  ArrayList<DataEventDependencyInfo> lastDataEvents = Lists.newArrayList();
   
   @VisibleForTesting
   TaskAttemptStatus reportedStatus;
@@ -822,8 +854,9 @@ public class TaskAttemptImpl implements TaskAttempt,
               : TaskAttemptTerminationCause.UNKNOWN_ERROR;
           this.diagnostics.add(tEvent.getDiagnostics());
           this.recoveredState = tEvent.getState();
-          this.lastDataEventTime = tEvent.getLastDataEventTime();
-          this.lastDataEventSourceTA = tEvent.getLastDataEventSourceTA();
+          if (tEvent.getDataEvents() != null) {
+            this.lastDataEvents.addAll(tEvent.getDataEvents());
+          }
           sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
           return recoveredState;
         }
@@ -1040,7 +1073,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, null,
-        "", getCounters(), lastDataEventTime, lastDataEventSourceTA);
+        "", getCounters(), lastDataEvents);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1057,8 +1090,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         finishTime, state,
         terminationCause,
         StringUtils.join(
-            getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime, 
-        lastDataEventSourceTA);
+            getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1324,12 +1356,16 @@ public class TaskAttemptImpl implements TaskAttempt,
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
-          .getStatusEvent();
+      TaskAttemptEventStatusUpdate sEvent = (TaskAttemptEventStatusUpdate) event; 
+      TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent();
       ta.reportedStatus.state = ta.getState();
       ta.reportedStatus.progress = statusEvent.getProgress();
       ta.reportedStatus.counters = statusEvent.getCounters();
       ta.statistics = statusEvent.getStatistics();
+      if (sEvent.getReadErrorReported()) {
+        // if there is a read error then track the next last data event
+        ta.appendNextDataEvent = true;
+      }
 
       ta.updateProgressSplits();
 
@@ -1655,8 +1691,20 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   @Override
   public void setLastEventSent(TezEvent lastEventSent) {
-    // task attempt id may be null for input data information events
-    this.lastDataEventSourceTA = lastEventSent.getSourceInfo().getTaskAttemptID();
-    this.lastDataEventTime = lastEventSent.getEventReceivedTime();
+    writeLock.lock();
+    try {
+      DataEventDependencyInfo info = new DataEventDependencyInfo(
+          lastEventSent.getEventReceivedTime(), lastEventSent.getSourceInfo().getTaskAttemptID());
+      // task attempt id may be null for input data information events
+      if (appendNextDataEvent) {
+        appendNextDataEvent = false;
+        lastDataEvents.add(info);
+      } else {
+        // over-write last event - array list makes it quick
+        lastDataEvents.set(lastDataEvents.size() - 1, info);
+      }
+    } finally {
+      writeLock.unlock();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 52761e2..fbde635 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -21,16 +21,22 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
 
 public class TaskAttemptFinishedEvent implements HistoryEvent {
@@ -45,9 +51,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private String diagnostics;
   private TezCounters tezCounters;
   private TaskAttemptTerminationCause error;
-  private TezTaskAttemptID lastDataEventSourceTA;
-  private long lastDataEventTime;
-
+  private List<DataEventDependencyInfo> dataEvents;
+  
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
       long startTime,
@@ -55,8 +60,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       TaskAttemptState state,
       TaskAttemptTerminationCause error,
       String diagnostics, TezCounters counters, 
-      long lastDataEventTime, 
-      TezTaskAttemptID lastDataEventSourceTA) {
+      List<DataEventDependencyInfo> dataEvents) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.startTime = startTime;
@@ -65,8 +69,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
     this.error = error;
-    this.lastDataEventTime = lastDataEventTime;
-    this.lastDataEventSourceTA = lastDataEventSourceTA;
+    this.dataEvents = dataEvents;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -87,14 +90,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     return true;
   }
   
-  public long getLastDataEventTime() {
-    return lastDataEventTime;
+  public List<DataEventDependencyInfo> getDataEvents() {
+    return dataEvents;
   }
   
-  public TezTaskAttemptID getLastDataEventSourceTA() {
-    return lastDataEventSourceTA;
-  }
-
   public TaskAttemptFinishedProto toProto() {
     TaskAttemptFinishedProto.Builder builder =
         TaskAttemptFinishedProto.newBuilder();
@@ -110,9 +109,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     if (tezCounters != null) {
       builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
     }
-    if (lastDataEventSourceTA != null) {
-      builder.setLastDataEventSourceTA(lastDataEventSourceTA.toString());
-      builder.setLastDataEventTime(lastDataEventTime);
+    if (dataEvents != null && !dataEvents.isEmpty()) {
+      for (DataEventDependencyInfo info : dataEvents) {
+        builder.addDataEvents(DataEventDependencyInfo.toProto(info));
+      }
     }
     return builder.build();
   }
@@ -131,9 +131,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
     }
-    if (proto.hasLastDataEventSourceTA()) {
-      this.lastDataEventSourceTA = TezTaskAttemptID.fromString(proto.getLastDataEventSourceTA());
-      this.lastDataEventTime = proto.getLastDataEventTime();
+    if (proto.getDataEventsCount() > 0) {
+      this.dataEvents = Lists.newArrayListWithCapacity(proto.getDataEventsCount());
+      for (DataEventDependencyInfoProto protoEvent : proto.getDataEventsList()) {
+        this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent));
+      }
     }
   }
 
@@ -163,8 +165,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
         + ", lastDataEventSourceTA=" + 
-              ((lastDataEventSourceTA==null) ? null:lastDataEventSourceTA.toString())
-        + ", lastDataEventTime=" + lastDataEventTime
+              ((dataEvents==null) ? 0:dataEvents.size())
         + ", counters=" + (tezCounters == null ? "null" :
           tezCounters.toString()
             .replaceAll("\\n", ", ").replaceAll("\\s+", " "));

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index b32b324..411d677 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -530,9 +530,9 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getCounters()));
-    otherInfo.put(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
-    if (event.getLastDataEventSourceTA() != null) {
-      otherInfo.put(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, event.getLastDataEventSourceTA().toString());
+    if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+      otherInfo.put(ATSConstants.LAST_DATA_EVENTS, 
+          DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents()));
     }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 1447832..76e592e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -42,6 +43,8 @@ import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
+import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.records.TezTaskID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -99,6 +102,27 @@ public class DAGUtils {
     }
     return dagJson;
   }
+  
+  public static JSONObject convertDataEventDependencyInfoToJSON(List<DataEventDependencyInfo> info) {
+    return new JSONObject(convertDataEventDependecyInfoToATS(info));
+  }
+  
+  public static Map<String, Object> convertDataEventDependecyInfoToATS(List<DataEventDependencyInfo> info) {
+    ArrayList<Object> infoList = new ArrayList<Object>();
+    for (DataEventDependencyInfo event : info) {
+      Map<String, Object> eventObj = new LinkedHashMap<String, Object>();
+      String id = "";
+      if (event.getTaskAttemptId() != null) {
+        id = event.getTaskAttemptId().toString();
+      }
+      eventObj.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), id);
+      eventObj.put(ATSConstants.TIMESTAMP, event.getTimestamp());
+      infoList.add(eventObj);
+    }
+    Map<String,Object> object = new LinkedHashMap<String, Object>();
+    putInto(object, ATSConstants.LAST_DATA_EVENTS, infoList);
+    return object;
+  }
 
   public static JSONObject convertCountersToJSON(TezCounters counters)
       throws JSONException {

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e268e0d..232f1b7 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -169,6 +169,11 @@ message TaskAttemptStartedProto {
   optional int64 allocation_time = 7;
 }
 
+message DataEventDependencyInfoProto {
+  optional string task_attempt_id = 1;
+  optional int64 timestamp = 2;
+}
+
 message TaskAttemptFinishedProto {
   optional string task_attempt_id = 1;
   optional int64 finish_time = 2;
@@ -176,8 +181,7 @@ message TaskAttemptFinishedProto {
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
   optional string error_enum = 6;
-  optional int64 last_data_event_time = 7;
-  optional string last_data_event_source_t_a = 8;
+  repeated DataEventDependencyInfoProto data_events = 7;
 }
 
 message EventMetaDataProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 117d3b3..35893b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -68,6 +68,7 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -79,6 +80,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
@@ -247,6 +249,7 @@ public class TestTaskCommunicatorManager1 {
     final Event statusUpdateEvent = argAllValues.get(0);
     assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
         statusUpdateEvent.getType());
+    assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
 
     final Event vertexEvent = argAllValues.get(1);
     final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
@@ -256,9 +259,39 @@ public class TestTaskCommunicatorManager1 {
         vertexRouteEvent.getEvents().get(0).getEventType());
     assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
         vertexRouteEvent.getEvents().get(1).getEventType());
+  }
+  
+  @Test (timeout = 5000)
+  public void testTaskEventRoutingWithReadError() throws Exception {
+    List<TezEvent> events =  Arrays.asList(
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+      new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
+      new TezEvent(new TaskAttemptCompletedEvent(), null)
+    );
+
+    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    final List<Event> argAllValues = arg.getAllValues();
+
+    final Event statusUpdateEvent = argAllValues.get(0);
+    assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+        statusUpdateEvent.getType());
+    assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
+
+    final Event vertexEvent = argAllValues.get(1);
+    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+        vertexEvent.getType());
+    assertEquals(EventType.INPUT_READ_ERROR_EVENT,
+        vertexRouteEvent.getEvents().get(0).getEventType());
+    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+        vertexRouteEvent.getEvents().get(1).getEventType());
 
   }
 
+
   @Test (timeout = 5000)
   public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
     List<TezEvent> events = Arrays.asList(

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 101b22f..2d30a6f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -584,6 +585,92 @@ public class TestTaskAttempt {
   }
   
   @Test(timeout = 5000)
+  public void testLastDataEventRecording() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    
+    long ts1 = 1024;
+    long ts2 = 2048;
+    TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class);
+    TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+    when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1);
+    when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1);
+    TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+    when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2);
+    when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2);
+    TaskAttemptEventStatusUpdate statusEvent =
+        new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null));
+
+    assertEquals(0, taImpl.lastDataEvents.size());
+    taImpl.setLastEventSent(mockTezEvent1);
+    assertEquals(1, taImpl.lastDataEvents.size());
+    assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp());
+    assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId());
+    taImpl.handle(statusEvent);
+    taImpl.setLastEventSent(mockTezEvent2);
+    assertEquals(1, taImpl.lastDataEvents.size());
+    assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp());
+    assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value
+    statusEvent.setReadErrorReported(true);
+    taImpl.handle(statusEvent);
+    taImpl.setLastEventSent(mockTezEvent1);
+    assertEquals(2, taImpl.lastDataEvents.size());
+    assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp());
+    assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event
+    taImpl.setLastEventSent(mockTezEvent2);
+    assertEquals(2, taImpl.lastDataEvents.size());
+    assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp());
+    assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value
+  }
+  
+  @Test(timeout = 5000)
   public void testFailure() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 6bbfc3d..53f1856 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -51,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -65,6 +66,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.common.collect.Lists;
+
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttemptRecovery {
 
@@ -177,9 +180,12 @@ public class TestTaskAttemptRecovery {
 
     long lastDataEventTime = 1024;
     TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
+    List<DataEventDependencyInfo> events = Lists.newLinkedList();
+    events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
+    events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, state, errorEnum, diag, counters, lastDataEventTime, lastDataEventTA));
+            startTime, finishTime, state, errorEnum, diag, counters, events));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(finishTime, ta.getFinishTime());
     assertEquals(counters, ta.reportedStatus.counters);
@@ -188,8 +194,9 @@ public class TestTaskAttemptRecovery {
     assertEquals(1, ta.getDiagnostics().size());
     assertEquals(diag, ta.getDiagnostics().get(0));
     assertEquals(state, recoveredState);
-    assertEquals(lastDataEventTime, ta.lastDataEventTime);
-    assertEquals(lastDataEventTA, ta.lastDataEventSourceTA);
+    assertEquals(events.size(), ta.lastDataEvents.size());
+    assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp());
+    assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId());
     if (state != TaskAttemptState.SUCCEEDED) {
       assertEquals(errorEnum, ta.getTerminationCause());
     } else {
@@ -314,7 +321,7 @@ public class TestTaskAttemptRecovery {
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             startTime, finishTime, TaskAttemptState.KILLED,
-            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), 0, null));
+            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null));
     assertEquals(TaskAttemptState.KILLED, recoveredState);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index eca8274..b6d4c10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -286,7 +286,7 @@ public class TestTaskRecovery {
     restoreFromTaskStartEvent();
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), 0, null));
+        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null));
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
     // wait for the second task attempt is scheduled
     dispatcher.await();
@@ -307,7 +307,7 @@ public class TestTaskRecovery {
     restoreFromTaskStartEvent();
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), 0, null));
+        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null));
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
     // wait for the second task attempt is scheduled
     dispatcher.await();
@@ -329,7 +329,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     try {
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), 0, null));
+          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null));
       fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
     } catch (TezUncheckedException e) {
       assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
@@ -372,7 +372,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -405,7 +405,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -438,7 +438,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -473,7 +473,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -516,7 +516,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -528,7 +528,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -563,7 +563,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -575,7 +575,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -614,7 +614,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -654,7 +654,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -735,7 +735,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters(), 0, null));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -776,7 +776,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.KILLED, null, "", null, 0, null));
+          0, TaskAttemptState.KILLED, null, "", null, null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(0, task.failedAttempts);
@@ -806,7 +806,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null, 0, null));
+          0, TaskAttemptState.FAILED, null, "", null, null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -836,7 +836,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null, 0, null));
+          0, TaskAttemptState.FAILED, null, "", null, null));
     }
     assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
     assertEquals(maxFailedAttempts - 1, task.failedAttempts);

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b215a06..5c8c90e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -508,9 +509,7 @@ public class TestHistoryEventsProtoConversion {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          null, null, null, 1024, 
-          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 110), 1), 1));
+          null, null, null, null);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -523,16 +522,20 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getState());
       Assert.assertEquals(event.getCounters(),
           deserializedEvent.getCounters());
-      Assert.assertEquals(event.getLastDataEventTime(), deserializedEvent.getLastDataEventTime());
-      Assert.assertEquals(event.getLastDataEventSourceTA(), deserializedEvent.getLastDataEventSourceTA());
       logEvents(event, deserializedEvent);
     }
     {
+      TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+          TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0);
+      long timestamp = 1024L;
+      List<DataEventDependencyInfo> events = Lists.newArrayList();
+      events.add(new DataEventDependencyInfo(timestamp, taId));
+      events.add(new DataEventDependencyInfo(timestamp, taId));
       TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), 0, null);
+          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -547,6 +550,9 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getCounters());
       Assert.assertEquals(event.getTaskAttemptError(),
           deserializedEvent.getTaskAttemptError());
+      Assert.assertEquals(events.size(), event.getDataEvents().size());
+      Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp());
+      Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId());
       logEvents(event, deserializedEvent);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 9c11dc7..711e4bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, 0, null);
+              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index ccec0db..ca008ce 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -20,6 +20,7 @@ package org.apache.tez.history.parser.datamodel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 
@@ -28,10 +29,13 @@ import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.history.parser.utils.Utils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -53,14 +57,29 @@ public class TaskAttemptInfo extends BaseInfo {
   private final String status;
   private final String logUrl;
   private final String creationCausalTA;
-  private final long lastDataEventTime;
-  private final String lastDataEventSourceTA;
   private final String terminationCause;
   private final long executionTimeInterval;
+  // this list is in time order - array list for easy walking
+  private final ArrayList<DataDependencyEvent> lastDataEvents = Lists.newArrayList();
 
   private TaskInfo taskInfo;
 
   private Container container;
+  
+  public static class DataDependencyEvent {
+    String taId;
+    long timestamp;
+    public DataDependencyEvent(String id, long time) {
+      taId = id;
+      timestamp = time;
+    }
+    public long getTimestamp() {
+      return timestamp;
+    }
+    public String getTaskAttemptId() {
+      return taId;
+    }
+  }
 
   TaskAttemptInfo(JSONObject jsonObject) throws JSONException {
     super(jsonObject);
@@ -87,9 +106,17 @@ public class TaskAttemptInfo extends BaseInfo {
 
     status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
     container = new Container(containerId, nodeId);
-    lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME);
-    lastDataEventSourceTA = StringInterner.weakIntern(
-        otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
+    if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) {
+      List<DataDependencyEvent> eventInfo = Utils.parseDataEventDependencyFromJSON(
+          otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS));
+      long lastTime = 0;
+      for (DataDependencyEvent item : eventInfo) {
+        // check these are in time order
+        Preconditions.checkState(lastTime < item.getTimestamp());
+        lastTime = item.getTimestamp();
+        lastDataEvents.add(item);
+      }
+    }
     terminationCause = StringInterner
         .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
     executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
@@ -121,6 +148,10 @@ public class TaskAttemptInfo extends BaseInfo {
     return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
   }
   
+  public final List<DataDependencyEvent> getLastDataEvents() {
+    return lastDataEvents;
+  }
+  
   public final long getExecutionTimeInterval() {
     return executionTimeInterval;
   }
@@ -149,14 +180,17 @@ public class TaskAttemptInfo extends BaseInfo {
     return creationTime;
   }
   
-  public final long getLastDataEventTime() {
-    return lastDataEventTime;
+  public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) {
+    for (int i=lastDataEvents.size()-1; i>=0; i--) {
+      // walk back in time until we get first event that happened before the threshold
+      DataDependencyEvent item = lastDataEvents.get(i);
+      if (item.getTimestamp() < timeThreshold) {
+        return item;
+      }
+    }
+    return null;
   }
   
-  public final String getLastDataEventSourceTA() {
-    return lastDataEventSourceTA;
-  }
-
   public final long getTimeTaken() {
     return getFinishTimeInterval() - getStartTimeInterval();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
index 7345012..ffb854a 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -19,7 +19,10 @@
 package org.apache.tez.history.parser.utils;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -27,8 +30,10 @@ import org.apache.log4j.PatternLayout;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.history.parser.datamodel.Constants;
 import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -83,6 +88,19 @@ public class Utils {
     }
     return counters;
   }
+  
+  public static List<DataDependencyEvent> parseDataEventDependencyFromJSON(JSONObject jsonObject) 
+      throws JSONException {
+    List<DataDependencyEvent> events = Lists.newArrayList();
+    JSONArray fields = jsonObject.optJSONArray(Constants.LAST_DATA_EVENTS);
+    for (int i=0; i<fields.length(); i++) {
+      JSONObject eventMap = fields.getJSONObject(i);
+      events.add(new DataDependencyEvent(
+          StringInterner.weakIntern(eventMap.optString(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())),
+          eventMap.optLong(Constants.TIMESTAMP)));
+    }
+    return events;
+  }
 
   /**
    * Parse events from json

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index 2b797a5..8dbfdc9 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -63,6 +63,7 @@ import org.apache.tez.history.parser.datamodel.BaseInfo;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.EdgeInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
 import org.apache.tez.history.parser.datamodel.TaskInfo;
 import org.apache.tez.history.parser.datamodel.VersionInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
@@ -77,7 +78,6 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.tests.MiniTezClusterWithTimeline;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -288,12 +288,14 @@ public class TestHistoryParser {
           }
         } else {
           for (TaskAttemptInfo attempt : attempts) {
-            assertTrue(attempt.getLastDataEventTime() > 0);
+            DataDependencyEvent item = attempt.getLastDataEvents().get(0);
+            assertTrue(item.getTimestamp() > 0);
+            
             if (lastDataEventSourceTA == null) {
-              lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+              lastDataEventSourceTA = item.getTaskAttemptId();
             } else {
               // all attempts should have the same last data event source TA
-              assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+              assertTrue(lastDataEventSourceTA.equals(item.getTaskAttemptId()));
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 1b7e183..4685a61 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -435,10 +435,9 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToATSMap(event.getCounters()));
-    atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
-    if (event.getLastDataEventSourceTA() != null) {
-      atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_SOURCE_TA,
-          event.getLastDataEventSourceTA().toString());
+    if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+      atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS, 
+          DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
     }
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 8db32b0..2849c10 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -45,6 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.app.web.AMWebController;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -85,6 +87,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestHistoryEventTimelineConversion {
 
   private ApplicationAttemptId applicationAttemptId;
@@ -170,7 +174,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, 0, null);
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -445,6 +449,7 @@ public class TestHistoryEventTimelineConversion {
         timelineEntity.getOtherInfo().get(ATSConstants.USER));
   }
 
+  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testConvertTaskAttemptFinishedEvent() {
     String vertexName = "testVertex";
@@ -457,9 +462,12 @@ public class TestHistoryEventTimelineConversion {
     String diagnostics = "random diagnostics message";
     TezCounters counters = new TezCounters();
     long lastDataEventTime = finishTime - 1;
+    List<DataEventDependencyInfo> events = Lists.newArrayList();
+    events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
+    events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
 
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
-        startTime, finishTime, state, error, diagnostics, counters, lastDataEventTime, tezTaskAttemptID);
+        startTime, finishTime, state, error, diagnostics, counters, events);
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -482,14 +490,17 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, evt.getTimestamp());
 
     final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
-    Assert.assertEquals(8, otherInfo.size());
+    Assert.assertEquals(7, otherInfo.size());
     Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
     Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
     Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
     Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
     Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
-    Assert.assertEquals(lastDataEventTime, otherInfo.get(ATSConstants.LAST_DATA_EVENT_TIME));
-    Assert.assertEquals(tezTaskAttemptID.toString(), otherInfo.get(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
+    Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS);
+    List<Object> obj2 = (List<Object>) obj1.get(ATSConstants.LAST_DATA_EVENTS);
+    Assert.assertEquals(2, obj2.size());
+    Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0);
+    Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP));
     Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index c8d4225..350f783 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -33,11 +33,11 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
 import org.apache.tez.analyzer.utils.SVGUtils;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.history.parser.datamodel.Container;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -165,8 +165,10 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
               .getAvgExecutionTimeInterval();
           if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) {
             step.notes
-                .add("Potential straggler. Execution time " + attempt.getExecutionTimeInterval()
-                    + " compared to vertex average of " + avgExecutionTime);
+                .add("Potential straggler. Execution time " + 
+                    SVGUtils.getTimeStr(attempt.getExecutionTimeInterval())
+                    + " compared to vertex average of " + 
+                    SVGUtils.getTimeStr(avgExecutionTime));
           }
           
           if (attempt.getStartTime() > step.startCriticalPathTime) {
@@ -231,14 +233,44 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
         Preconditions.checkState(currentAttemptStopCriticalPathTime > 0);
         System.out.println(
             "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
+        
         currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT);
         currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
+
+        // consider the last data event seen immediately preceding the current critical path 
+        // stop time for this attempt
+        long currentStepLastDataEventTime = 0;
+        String currentStepLastDataTA = null;
+        DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime);
+        if (item!=null) {
+          currentStepLastDataEventTime = item.getTimestamp();
+          currentStepLastDataTA = item.getTaskAttemptId();
+        }
+
+        // sanity check
+        for (CriticalPathStep previousStep : tempCP) {
+          if (previousStep.type == EntityType.ATTEMPT) {
+            if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) {
+              // found loop.
+              // this should only happen for read errors in currentAttempt
+              List<DataDependencyEvent> dataEvents = currentAttempt.getLastDataEvents();
+              Preconditions.checkState(dataEvents.size() > 1); // received
+                                                               // original and
+                                                               // retry data events
+              Preconditions.checkState(currentStepLastDataEventTime < dataEvents
+                  .get(dataEvents.size() - 1).getTimestamp()); // new event is
+                                                               // earlier than
+                                                               // last
+            }
+          }
+        }
+
         tempCP.add(currentStep);
   
         // find the next attempt on the critical path
         boolean dataDependency = false;
         // find out predecessor dependency
-        if (currentAttempt.getLastDataEventTime() > currentAttempt.getCreationTime()) {
+        if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) {
           dataDependency = true;
         }
   
@@ -248,13 +280,13 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
         if (dataDependency) {
           // last data event was produced after the attempt was scheduled. use
           // data dependency
-          // typically case when scheduling ahead of time
+          // typically the case when scheduling ahead of time
           System.out.println("Has data dependency");
-          if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+          if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
             // there is a valid data causal TA. Use it.
-            nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+            nextAttemptId = currentStepLastDataTA;
             reason = CriticalPathDependency.DATA_DEPENDENCY;
-            startCriticalPathTime = currentAttempt.getLastDataEventTime();
+            startCriticalPathTime = currentStepLastDataEventTime;
             System.out.println("Using data dependency " + nextAttemptId);
           } else {
             // there is no valid data causal TA. This means data event came from the same vertex
@@ -289,20 +321,30 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
                 }
               }
             }
-            startCriticalPathTime = currentAttempt.getCreationTime();
+            if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
+              // rescheduled due to read error. start critical at read error report time.
+              // for now proxy own creation time for read error report time
+              startCriticalPathTime = currentAttempt.getCreationTime();
+            } else {
+              // rescheduled due to own previous attempt failure
+              // we are critical when the previous attempt fails
+              Preconditions.checkState(nextAttempt != null);
+              Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals(
+                  currentAttempt.getTaskInfo().getTaskId()));
+              startCriticalPathTime = nextAttempt.getFinishTime();
+            }
             System.out.println("Using scheduling dependency " + nextAttemptId);
           } else {
             // there is no scheduling causal TA.
-            if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) {
+            if (!Strings.isNullOrEmpty(currentStepLastDataTA)) {
               // there is a data event going to the vertex. Count the time between data event and
-              // scheduling time as Initializer/Manager overhead and follow data dependency
-              nextAttemptId = currentAttempt.getLastDataEventSourceTA();
+              // creation time as Initializer/Manager overhead and follow data dependency
+              nextAttemptId = currentStepLastDataTA;
               reason = CriticalPathDependency.DATA_DEPENDENCY;
-              startCriticalPathTime = currentAttempt.getLastDataEventTime();
-              long overhead = currentAttempt.getCreationTime()
-                  - currentAttempt.getLastDataEventTime();
+              startCriticalPathTime = currentStepLastDataEventTime;
+              long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime;
               currentStep.notes
-                  .add("Initializer/VertexManager scheduling overhead " + overhead + " ms");
+                  .add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead));
               System.out.println("Using data dependency " + nextAttemptId);
             } else {
               // there is no scheduling causal TA and no data event casual TA.
@@ -316,89 +358,6 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer {
           }
         }
 
-        
-        if (!Strings.isNullOrEmpty(nextAttemptId)) {
-          TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
-          TaskAttemptInfo attemptToCheck = nextAttempt;
-
-          // check if the next attempt is already on critical path to prevent infinite loop
-          boolean foundLoop = false;
-          CriticalPathDependency prevReason = null;
-          for (CriticalPathStep previousStep : tempCP) {
-            if (previousStep.attempt.equals(attemptToCheck)) {
-              foundLoop = true;
-              prevReason = previousStep.reason;
-            }
-          }
-
-          if (foundLoop) {
-            // found a loop - find the next step based on heuristics
-            /* only the losing outputs causes us to backtrack. There are 2 cases
-            * 1) Step N reported last data event to this step 
-            *    -> Step N+1 (current step) is the retry for read error reported
-            *    -> read error was reported by the Step N attempt and it did not exit after the 
-            *       error
-            *    -> So scheduling dependency of Step N points back to step N+1
-            * 2) Step N reported last data event to this step
-            *     -> Step N+1 is a retry for a read error reported
-            *     -> Step N+2 is the attempt that reported the read error
-            *     -> Step N+3 is the last data event of N+2 and points back to N+1
-            */
-            System.out.println("Reset " + currentAttempt.getTaskAttemptId() 
-            + " cause: " + currentAttempt.getTerminationCause() 
-            + " time: " + currentAttempt.getFinishTime()
-            + " reason: " + reason
-            + " because of: " + attemptToCheck.getTaskAttemptId());
-            TaskAttemptInfo attemptWithLostAncestor = currentAttempt;
-            if (reason != CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
-              // Case 2 above. If reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY
-              // then its Case 1 above
-              Preconditions.checkState(prevReason.equals(
-                  CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), prevReason);
-              reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
-              attemptWithLostAncestor = nextAttempt;
-            }
-            System.out.println("Reset " + currentAttempt.getTaskAttemptId() 
-            + " cause: " + currentAttempt.getTerminationCause() 
-            + " time: " + currentAttempt.getFinishTime()
-            + " reason: " + reason
-            + " because of: " + attemptToCheck.getTaskAttemptId()
-            + " looking at: " + attemptWithLostAncestor.getTaskAttemptId());
-            Preconditions.checkState(reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY);
-            // we dont track all input events to the consumer. So just jump to
-            // the previous successful version of the current attempt
-            TaskAttemptInfo prevSuccAttempt = null;
-            for (TaskAttemptInfo prevAttempt : attemptWithLostAncestor.getTaskInfo().getTaskAttempts()) {
-              System.out.println("Looking at " + prevAttempt.getTaskAttemptId() 
-              + " cause: " + prevAttempt.getTerminationCause() + 
-              " time: " + prevAttempt.getFinishTime());
-              if (prevAttempt.getTerminationCause()
-                  .equals(TaskAttemptTerminationCause.OUTPUT_LOST.name())) {
-                if (prevAttempt.getFinishTime() < currentAttempt.getFinishTime()) {
-                  // attempt finished before current attempt
-                  if (prevSuccAttempt == null
-                      || prevAttempt.getFinishTime() > prevSuccAttempt.getFinishTime()) {
-                    // keep the latest attempt that had lost outputs
-                    prevSuccAttempt = prevAttempt;
-                  }
-                }
-              }
-            }
-            Preconditions.checkState(prevSuccAttempt != null,
-                attemptWithLostAncestor.getTaskAttemptId());
-            System.out
-                .println("Resetting nextAttempt to : " + prevSuccAttempt.getTaskAttemptId()
-                    + " from " + nextAttempt.getTaskAttemptId());
-            nextAttemptId = prevSuccAttempt.getTaskAttemptId();
-            if (attemptWithLostAncestor == currentAttempt) {
-              startCriticalPathTime = currentAttempt.getCreationTime();
-            } else {
-              startCriticalPathTime = prevSuccAttempt.getFinishTime();
-            }
-          }
-          
-        }
-
         currentStep.startCriticalPathTime = startCriticalPathTime;
         currentStep.reason = reason;
         

http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index 9a75461..f3a69a6 100644
--- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -182,7 +182,7 @@ public class TestAnalyzer {
   DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception {
     tezSession.waitTillReady();
     numDAGs++;
-    LOG.info("XXX Running DAG name: " + dag.getName());
+    LOG.info("ABC Running DAG name: " + dag.getName());
     DAGClient dagClient = tezSession.submitDAG(dag);
     DAGStatus dagStatus = dagClient.getDAGStatus(null);
     while (!dagStatus.isCompleted()) {
@@ -230,13 +230,13 @@ public class TestAnalyzer {
     List<CriticalPathStep> criticalPath = cp.getCriticalPath();
 
     for (CriticalPathStep step : criticalPath) {
-      LOG.info("XXX Step: " + step.getType());
+      LOG.info("ABC Step: " + step.getType());
       if (step.getType() == EntityType.ATTEMPT) {
-        LOG.info("XXX Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
+        LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus());
       }
-      LOG.info("XXX Reason: " + step.getReason());
+      LOG.info("ABC Reason: " + step.getReason());
       String notes = Joiner.on(";").join(step.getNotes());
-      LOG.info("XXX Notes: " + notes);
+      LOG.info("ABC Notes: " + notes);
     }
 
     boolean foundMatchingLength = false;
@@ -361,6 +361,7 @@ public class TestAnalyzer {
 
     StepCheck[] check = {
         createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
         createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
         createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
       };
@@ -416,7 +417,9 @@ public class TestAnalyzer {
 
     StepCheck[] check = {
         createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
         createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
         createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
         createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
       };
@@ -484,6 +487,9 @@ public class TestAnalyzer {
 
     StepCheck[] check = {
         createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
+        createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
         createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
         createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY),
         createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
@@ -553,7 +559,13 @@ public class TestAnalyzer {
     StepCheck[] check = {
         // use regex for either vertices being possible on the path
         createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
-        createStep("v[12] : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v[12] : 000000_[01]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v[12] : 000000_[012]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
+        createStep("v[12] : 000000_[12]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+        createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
         createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
         createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
       };
@@ -643,17 +655,11 @@ public class TestAnalyzer {
     StepCheck[] check1 = {
         // use regex for either vertices being possible on the path
       createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
-      createStep("v[23] : 000000_0", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
+      createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
       createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
       createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
     };
-    StepCheck[] check2 = {
-        createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
-        createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
-        createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
-    };
     stepsOptions.add(check1);
-    stepsOptions.add(check2);
     DAG dag = SimpleReverseVTestDAG.createDAG(
             "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions);