You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/09 22:59:08 UTC

[2/2] tez git commit: TEZ-2789. Backport events added in TEZ-2612 to branch-0.7 (bikas)

TEZ-2789. Backport events added in TEZ-2612 to branch-0.7 (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bc56ca31
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bc56ca31
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bc56ca31

Branch: refs/heads/branch-0.7
Commit: bc56ca3157973971b7e0e21ed834d56ecc7cdd46
Parents: 9be8cd4
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Sep 9 13:54:23 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Sep 9 13:54:23 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/ATSConstants.java     |   4 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  19 ++-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   3 +
 .../dag/event/TaskAttemptEventStatusUpdate.java |   9 ++
 .../dag/app/dag/event/TaskEventTAUpdate.java    |  14 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  13 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 138 +++++++++++++++++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  61 ++++++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  27 +++-
 .../tez/dag/app/dag/impl/VertexManager.java     |   3 +-
 .../tez/dag/app/rm/container/AMContainer.java   |   1 +
 .../dag/app/rm/container/AMContainerImpl.java   |  12 ++
 .../events/TaskAttemptFinishedEvent.java        |  32 ++++-
 .../history/events/TaskAttemptStartedEvent.java |  52 +++++--
 .../VertexRecoverableEventsGeneratedEvent.java  |   3 +-
 .../impl/HistoryEventJsonConversion.java        |   9 ++
 .../apache/tez/dag/history/utils/DAGUtils.java  |  24 ++++
 tez-dag/src/main/proto/HistoryEvents.proto      |  10 ++
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  10 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |   6 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  38 ++++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  89 ++++++++++++
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  27 +++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  41 +++++-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |  46 +++----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  31 ++++-
 .../dag/app/rm/container/TestAMContainer.java   |   5 +-
 .../TestHistoryEventsProtoConversion.java       |  30 +++-
 .../impl/TestHistoryEventJsonConversion.java    |   4 +-
 .../ats/HistoryEventTimelineConversion.java     |  11 +-
 .../ats/TestHistoryEventTimelineConversion.java |  31 ++++-
 .../apache/tez/runtime/api/impl/TezEvent.java   |  17 +++
 33 files changed, 710 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 812004c..90935c7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2789. Backport events added in TEZ-2612 to branch-0.7
   TEZ-2766. Tez UI: Add vertex in-progress info in DAG details
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
   down an AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index a85dbd9..f786a4e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -58,6 +58,8 @@ public class ATSConstants {
   public static final String VERTEX_NAME = "vertexName";
   public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
   public static final String SCHEDULED_TIME = "scheduledTime";
+  public static final String CREATION_TIME = "creationTime";
+  public static final String ALLOCATION_TIME = "allocationTime";
   public static final String INIT_REQUESTED_TIME = "initRequestedTime";
   public static final String INIT_TIME = "initTime";
   public static final String START_REQUESTED_TIME = "startRequestedTime";
@@ -82,7 +84,9 @@ public class ATSConstants {
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";
   public static final String EXIT_STATUS = "exitStatus";
+  public static final String LAST_DATA_EVENTS = "lastDataEvents";
   public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
+  public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
 
   /* Counters-related keys */
   public static final String COUNTER_GROUPS = "counterGroups";

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index ddbee59..5ef89f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.slf4j.Logger;
@@ -428,22 +427,34 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
               " events: " + (inEvents != null? inEvents.size() : -1));
         }
 
+        long currTime = context.getClock().getTime();
         List<TezEvent> otherEvents = new ArrayList<TezEvent>();
         // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
         // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
         // to VertexImpl to ensure the events ordering
         //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
         //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+        TaskAttemptEventStatusUpdate taskAttemptEvent = null;
+        boolean readErrorReported = false;
         for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+          // for now, set the event time on the AM when it is received.
+          // this avoids any time disparity between machines.
+          tezEvent.setEventReceivedTime(currTime);
           final EventType eventType = tezEvent.getEventType();
           if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
-            TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
-                    (TaskStatusUpdateEvent) tezEvent.getEvent());
-            context.getEventHandler().handle(taskAttemptEvent);
+            taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+                (TaskStatusUpdateEvent) tezEvent.getEvent());
           } else {
+            if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
+              readErrorReported = true;
+            }
             otherEvents.add(tezEvent);
           }
         }
+        if (taskAttemptEvent != null) {
+          taskAttemptEvent.setReadErrorReported(readErrorReported);
+          context.getEventHandler().handle(taskAttemptEvent);
+        }
         if(!otherEvents.isEmpty()) {
           TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
           context.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 6c85cc2..4360cc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
 
 /**
  * Read only view of TaskAttempt.
@@ -79,6 +80,8 @@ public interface TaskAttempt {
   float getProgress();
   TaskAttemptState getState();
   TaskAttemptState getStateNoLock();
+  
+  void setLastEventSent(TezEvent lastEventSent);
 
   /** 
    * Has attempt reached the final state or not.

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index c5a6ea7..458679c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -24,6 +24,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
   
   private TaskStatusUpdateEvent taskAttemptStatus;
+  private boolean readErrorReported = false;
   
   public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
       TaskStatusUpdateEvent statusEvent) {
@@ -34,4 +35,12 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
   public TaskStatusUpdateEvent getStatusEvent() {
     return this.taskAttemptStatus;
   }
+  
+  public void setReadErrorReported(boolean value) {
+    readErrorReported = value;
+  }
+  
+  public boolean getReadErrorReported() {
+    return readErrorReported;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
index 59c7363..01eaf5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
@@ -18,19 +18,31 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
+@SuppressWarnings("rawtypes")
 public class TaskEventTAUpdate extends TaskEvent {
 
   private TezTaskAttemptID attemptID;
+  private TezAbstractEvent causalEvent;
 
   public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) {
+    this(id, type, null);
+  }
+
+  public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent) {
     super(id.getTaskID(), type);
     this.attemptID = id;
+    this.causalEvent = causalEvent;
   }
-
+  
   public TezTaskAttemptID getTaskAttemptID() {
     return attemptID;
   }
+  
+  public TezAbstractEvent getCausalEvent() {
+    return causalEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 576e1cf..0be7790 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -382,7 +382,7 @@ public class Edge {
     EventMetaData srcInfo = tezEvent.getSourceInfo();
     
     for (DataMovementEvent dmEvent : compEvent.getEvents()) {
-      TezEvent newEvent = new TezEvent(dmEvent, srcInfo);
+      TezEvent newEvent = new TezEvent(dmEvent, srcInfo, tezEvent.getEventReceivedTime());
       sendTezEventToDestinationTasks(newEvent);
     }
   }
@@ -411,7 +411,7 @@ public class Edge {
             InputFailedEvent ifEvent = ((InputFailedEvent) event);
             e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
           }
-          tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+          tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
           tezEventToSend.setDestinationInfo(destinationMetaInfo);
           // cache the event object per input because are unique per input index
           inputIndicesWithEvents.put(inputIndex, tezEventToSend);
@@ -558,7 +558,8 @@ public class Edge {
                 DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
                     targetIndices[numEventsDone]);
                 numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+                    tezEvent.getEventReceivedTime());
                 tezEventToSend.setDestinationInfo(destinationMetaInfo);
                 listToAdd.add(tezEventToSend);
               }
@@ -590,7 +591,8 @@ public class Edge {
               while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                 InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
                 numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+                    tezEvent.getEventReceivedTime());
                 tezEventToSend.setDestinationInfo(destinationMetaInfo);
                 listToAdd.add(tezEventToSend);
               }
@@ -622,7 +624,8 @@ public class Edge {
               while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                 DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
                 numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+                    tezEvent.getEventReceivedTime());
                 tezEventToSend.setDestinationInfo(destinationMetaInfo);
                 listToAdd.add(tezEventToSend);
               }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5792af0..266358d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -96,6 +97,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -118,6 +120,37 @@ public class TaskAttemptImpl implements TaskAttempt,
   private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class);
   private static final String LINE_SEPARATOR = System
       .getProperty("line.separator");
+  
+  public static class DataEventDependencyInfo {
+    long timestamp;
+    TezTaskAttemptID taId;
+    public DataEventDependencyInfo(long time, TezTaskAttemptID id) {
+      this.timestamp = time;
+      this.taId = id;
+    }
+    public long getTimestamp() {
+      return timestamp;
+    }
+    public TezTaskAttemptID getTaskAttemptId() {
+      return taId;
+    }
+    public static DataEventDependencyInfoProto toProto(DataEventDependencyInfo info) {
+      DataEventDependencyInfoProto.Builder builder = DataEventDependencyInfoProto.newBuilder();
+      builder.setTimestamp(info.timestamp);
+      if (info.taId != null) {
+        builder.setTaskAttemptId(info.taId.toString());
+      }
+      return builder.build();
+    }
+    
+    public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto proto) {
+      TezTaskAttemptID taId = null;
+      if(proto.hasTaskAttemptId()) {
+        taId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+      }
+      return new DataEventDependencyInfo(proto.getTimestamp(), taId);
+    }
+  }
 
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
 
@@ -139,6 +172,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   // TODO Can these be replaced by the container object TEZ-1037
   private Container container;
+  private long allocationTime;
   private ContainerId containerId;
   private NodeId containerNodeId;
   private String nodeHttpAddress;
@@ -148,6 +182,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Vertex vertex;
 
   @VisibleForTesting
+  boolean appendNextDataEvent = true;
+  ArrayList<DataEventDependencyInfo> lastDataEvents = Lists.newArrayList();
+  
+  @VisibleForTesting
   TaskAttemptStatus reportedStatus;
   private DAGCounter localityCounter;
   
@@ -166,6 +204,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Resource taskResource;
   private final ContainerContext containerContext;
   private final boolean leafVertex;
+  
+  private TezTaskAttemptID creationCausalTA;
+  private long creationTime;
 
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
@@ -411,12 +452,22 @@ public class TaskAttemptImpl implements TaskAttempt,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
       Task task) {
+    this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+        taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
+        task, null);
+  }
+  public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+      TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+      TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
+      boolean isRescheduled,
+      Resource resource, ContainerContext containerContext, boolean leafVertex,
+      Task task, TezTaskAttemptID schedulingCausalTA) {
 
-    this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
+    MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
 
-    this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
+    MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -431,6 +482,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.appContext = appContext;
     this.task = task;
     this.vertex = this.task.getVertex();
+    this.creationCausalTA = schedulingCausalTA;
+    this.creationTime = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
@@ -462,6 +515,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   public TezDAGID getDAGID() {
     return getVertexID().getDAGId();
   }
+  
+  public TezTaskAttemptID getSchedulingCausalTA() {
+    return creationCausalTA;
+  }
 
   TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
     TaskSpec baseTaskSpec = task.getBaseTaskSpec();
@@ -660,6 +717,33 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
+  public long getCreationTime() {
+    readLock.lock();
+    try {
+      return creationTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public TezTaskAttemptID getCreationCausalAttempt() {
+    readLock.lock();
+    try {
+      return creationCausalTA;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public long getAllocationTime() {
+    readLock.lock();
+    try {
+      return allocationTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public long getFinishTime() {
     readLock.lock();
@@ -753,6 +837,9 @@ public class TaskAttemptImpl implements TaskAttempt,
         {
           TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
           this.launchTime = tEvent.getStartTime();
+          this.creationTime = tEvent.getCreationTime();
+          this.allocationTime = tEvent.getAllocationTime();
+          this.creationCausalTA = tEvent.getCreationCausalTA();
           recoveryStartEventSeen = true;
           recoveredState = TaskAttemptState.RUNNING;
           this.containerId = tEvent.getContainerId();
@@ -770,6 +857,9 @@ public class TaskAttemptImpl implements TaskAttempt,
               : TaskAttemptTerminationCause.UNKNOWN_ERROR;
           this.diagnostics.add(tEvent.getDiagnostics());
           this.recoveredState = tEvent.getState();
+          if (tEvent.getDataEvents() != null) {
+            this.lastDataEvents.addAll(tEvent.getDataEvents());
+          }
           sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
           return recoveredState;
         }
@@ -973,7 +1063,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getVertex().getName(),
         launchTime, containerId, containerNodeId,
-        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
+        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, creationTime, creationCausalTA, 
+        allocationTime);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), startEvt));
   }
@@ -985,7 +1076,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, null,
-        "", getCounters());
+        "", getCounters(), lastDataEvents);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1002,7 +1093,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         finishTime, state,
         terminationCause,
         StringUtils.join(
-            getDiagnostics(), LINE_SEPARATOR), getCounters());
+            getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1130,7 +1221,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           .getTaskAttemptState());
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
-          .getTaskEventType()));
+          .getTaskEventType(), event));
     }
   }
 
@@ -1140,9 +1231,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
       TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
 
-      Container container = ta.appContext.getAllContainers()
-          .get(event.getContainerId()).getContainer();
+      AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); 
+      Container container = amContainer.getContainer();
 
+      ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
       ta.container = container;
       ta.containerId = event.getContainerId();
       ta.containerNodeId = container.getNodeId();
@@ -1268,12 +1360,16 @@ public class TaskAttemptImpl implements TaskAttempt,
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
-          .getStatusEvent();
+      TaskAttemptEventStatusUpdate sEvent = (TaskAttemptEventStatusUpdate) event; 
+      TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent();
       ta.reportedStatus.state = ta.getState();
       ta.reportedStatus.progress = statusEvent.getProgress();
       ta.reportedStatus.counters = statusEvent.getCounters();
       ta.statistics = statusEvent.getStatistics();
+      if (sEvent.getReadErrorReported()) {
+        // if there is a read error then track the next last data event
+        ta.appendNextDataEvent = true;
+      }
 
       ta.updateProgressSplits();
 
@@ -1518,7 +1614,7 @@ public class TaskAttemptImpl implements TaskAttempt,
             new EventMetaData(EventProducerConsumerType.SYSTEM, 
                 vertex.getName(), 
                 edgeVertex.getName(), 
-                getID())));
+                getID()), appContext.getClock().getTime()));
       }
       sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
     }
@@ -1595,4 +1691,24 @@ public class TaskAttemptImpl implements TaskAttempt,
   public String toString() {
     return getID().toString();
   }
+
+
+  @Override
+  public void setLastEventSent(TezEvent lastEventSent) {
+    writeLock.lock();
+    try {
+      DataEventDependencyInfo info = new DataEventDependencyInfo(
+          lastEventSent.getEventReceivedTime(), lastEventSent.getSourceInfo().getTaskAttemptID());
+      // task attempt id may be null for input data information events
+      if (appendNextDataEvent) {
+        appendNextDataEvent = false;
+        lastDataEvents.add(info);
+      } else {
+        // over-write last event - array list makes it quick
+        lastDataEvents.set(lastDataEvents.size() - 1, info);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ef8e33a..e6027f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -70,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
@@ -549,7 +550,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID tezTaskAttemptID) {
-    TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId());
+    TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null);
     return taskAttempt;
   }
 
@@ -814,10 +815,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
   
-  TaskAttemptImpl createAttempt(int attemptNumber) {
+  TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
-        (failedAttempts > 0), taskResource, containerContext, leafVertex, this);
+        (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
   }
 
   @Override
@@ -834,8 +835,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   // This is always called in the Write Lock
-  private void addAndScheduleAttempt() {
-    TaskAttempt attempt = createAttempt(attempts.size());
+  private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
+    TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
     }
@@ -1048,7 +1049,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event;
       task.locationHint = scheduleEvent.getTaskLocationHint();
       task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
-      task.addAndScheduleAttempt();
+      // For now, initial scheduling dependency is due to vertex manager scheduling
+      task.addAndScheduleAttempt(null);
       task.scheduledTime = task.clock.getTime();
       task.logJobHistoryTaskStartedEvent();
     }
@@ -1066,7 +1068,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     public void transition(TaskImpl task, TaskEvent event) {
       LOG.info("Scheduling a redundant attempt for task " + task.taskId);
       task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
-      task.addAndScheduleAttempt();
+      TezTaskAttemptID earliestUnfinishedAttempt = null;
+      for (TaskAttempt ta : task.attempts.values()) {
+        // find the oldest running attempt
+        if (!ta.isFinished()) {
+          earliestUnfinishedAttempt = ta.getID();
+        }
+      }
+      task.addAndScheduleAttempt(earliestUnfinishedAttempt);
     }
   }
 
@@ -1143,9 +1152,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       // we KillWaitAttemptCompletedTransitionready have a spare
       task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
       task.getVertex().incrementKilledTaskAttemptCount();
-      if (task.getUncompletedAttemptsCount() == 0
-          && task.successfulAttempt == null) {
-        task.addAndScheduleAttempt();
+      if (task.shouldScheduleNewAttempt()) {
+        task.addAndScheduleAttempt(castEvent.getTaskAttemptID());
       }
     }
   }
@@ -1255,7 +1263,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           // If any incomplete, the running attempt will moved to failed and its
           // update will trigger a new attempt if possible
           if (task.attempts.size() == task.getFinishedAttemptsCount()) {
-            task.addAndScheduleAttempt();
+            task.addAndScheduleAttempt(null);
           }
           endState = TaskStateInternal.RUNNING;
           break;
@@ -1304,15 +1312,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       return task.getInternalState();
     }
   }
+  
+  private boolean shouldScheduleNewAttempt() {
+    return (getUncompletedAttemptsCount() == 0
+            && successfulAttempt == null);
+  }
 
   private static class AttemptFailedTransition implements
     MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
+    private TezTaskAttemptID schedulingCausalTA;
+    
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       task.getVertex().incrementFailedTaskAttemptCount();
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+      schedulingCausalTA = castEvent.getTaskAttemptID();
       task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed,"
           + " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics());
       if (task.commitAttempt != null &&
@@ -1327,12 +1343,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);
         // we don't need a new event if we already have a spare
-        if (task.getUncompletedAttemptsCount() == 0
-            && task.successfulAttempt == null) {
+        if (task.shouldScheduleNewAttempt()) {
           LOG.info("Scheduling new attempt for task: " + task.getTaskId()
               + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: "
               + task.maxFailedAttempts);
-          task.addAndScheduleAttempt();
+          task.addAndScheduleAttempt(getSchedulingCausalTA());
         }
       } else {
         LOG.info("Failing task: " + task.getTaskId()
@@ -1352,11 +1367,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     protected TaskStateInternal getDefaultState(TaskImpl task) {
       return task.getInternalState();
     }
+    
+    protected TezTaskAttemptID getSchedulingCausalTA() {
+      return schedulingCausalTA;
+    }
   }
 
   private static class TaskRetroactiveFailureTransition
       extends AttemptFailedTransition {
 
+    private TezTaskAttemptID schedulingCausalTA;
+
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       if (task.leafVertex) {
@@ -1386,6 +1407,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // succeeded state
         return TaskStateInternal.SUCCEEDED;
       }
+      
+      Preconditions.checkState(castEvent.getCausalEvent() != null);
+      TaskAttemptEventOutputFailed destinationEvent = 
+          (TaskAttemptEventOutputFailed) castEvent.getCausalEvent();
+      schedulingCausalTA = destinationEvent.getInputFailedEvent().getSourceInfo().getTaskAttemptID();
 
       // super.transition is mostly coded for the case where an
       //  UNcompleted task failed.  When a COMPLETED task retroactively
@@ -1402,6 +1428,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
       return returnState;
     }
+    
+    @Override
+    protected TezTaskAttemptID getSchedulingCausalTA() {
+      return schedulingCausalTA;
+    }
 
     @Override
     protected TaskStateInternal getDefaultState(TaskImpl task) {
@@ -1433,7 +1464,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // from the map splitInfo. So the bad node might be sent as a location
         // to the RM. But the RM would ignore that just like it would ignore
         // currently pending container requests affinitized to bad nodes.
-        task.addAndScheduleAttempt();
+        task.addAndScheduleAttempt(attemptId);
         return TaskStateInternal.SCHEDULED;
       } else {
         // nothing to do

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7cf12e3..95ee298 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3446,6 +3446,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         vertex.distanceFromRoot = distanceFromRoot;
       }
       vertex.numStartedSourceVertices++;
+      vertex.startTimeRequested = vertex.clock.getTime();
       LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
           " for vertex: " + vertex.logIdentifier + " numStartedSources: " +
           vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
@@ -3504,12 +3505,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       Preconditions.checkState(
           (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()),
           "Vertex: " + vertex.logIdentifier + " got invalid start event");
-      vertex.startTimeRequested = vertex.clock.getTime();
       vertex.startSignalPending = true;
+      vertex.startTimeRequested = vertex.clock.getTime();
     }
 
   }
-
+  
   public static class StartTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -3517,7 +3518,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   public VertexState transition(VertexImpl vertex, VertexEvent event) {
       Preconditions.checkState(vertex.getState() == VertexState.INITED,
           "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier);
-      vertex.startTimeRequested = vertex.clock.getTime();
+      // if the start signal is pending this event is a fake start event to trigger this transition
+      if (!vertex.startSignalPending) {
+        vertex.startTimeRequested = vertex.clock.getTime();
+      }
       return vertex.startVertex();
     }
   }
@@ -4083,7 +4087,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   @Override
   public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
       int fromEventId, int preRoutedFromEventId, int maxEvents) {
-    ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(
+    Task task = getTask(attemptID.getTaskID());
+    ArrayList<TezEvent> events = task.getTaskAttemptTezEvents(
         attemptID, preRoutedFromEventId, maxEvents);
     int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
     int nextFromEventId = fromEventId;
@@ -4169,6 +4174,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     } finally {
       onDemandRouteEventsReadLock.unlock();
     }
+    if (!events.isEmpty()) {
+      for (int i=(events.size() - 1); i>=0; --i) {
+        TezEvent lastEvent = events.get(i);
+              // record the last event sent by the AM to the task
+        EventType lastEventType = lastEvent.getEventType();
+        // if the following changes then critical path logic/recording may need revision
+        if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT ||
+            lastEventType == EventType.DATA_MOVEMENT_EVENT ||
+            lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+          task.getAttempt(attemptID).setLastEventSent(lastEvent);
+          break;
+        }
+      }
+    }
     return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 0cc6666..803159a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -231,11 +231,12 @@ public class VertexManager {
         Collection<InputDataInformationEvent> events) {
       checkAndThrowIfDone();
       verifyIsRootInput(inputName);
+      final long currTime = appContext.getClock().getTime();
       Collection<TezEvent> tezEvents = Collections2.transform(events,
           new Function<InputDataInformationEvent, TezEvent>() {
             @Override
             public TezEvent apply(InputDataInformationEvent riEvent) {
-              TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata);
+              TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime);
               tezEvent.setDestinationInfo(getDestinationMetaData(inputName));
               return tezEvent;
             }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index a6b403d..7d6da8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -32,5 +32,6 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
   public Container getContainer();
   public List<TezTaskAttemptID> getAllTaskAttempts();
   public TezTaskAttemptID getCurrentTaskAttempt();
+  public long getCurrentTaskAttemptAllocationTime();
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 330f2b7..9b90752 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -95,6 +95,7 @@ public class AMContainerImpl implements AMContainer {
   private boolean nodeFailed = false;
 
   private TezTaskAttemptID currentAttempt;
+  private long currentAttemptAllocationTime;
   private List<TezTaskAttemptID> failedAssignments;
 
   private boolean inError = false;
@@ -362,6 +363,16 @@ public class AMContainerImpl implements AMContainer {
     }
   }
 
+  @Override
+  public long getCurrentTaskAttemptAllocationTime() {
+    readLock.lock();
+    try {
+      return this.currentAttemptAllocationTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   public boolean isInErrorState() {
     return inError;
   }
@@ -532,6 +543,7 @@ public class AMContainerImpl implements AMContainer {
       // Register the additional resources back for this container.
       container.containerLocalResources.putAll(container.additionalLocalResources);
       container.currentAttempt = event.getTaskAttemptId();
+      container.currentAttemptAllocationTime = container.appContext.getClock().getTime();
       if (LOG.isDebugEnabled()) {
         LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
         LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources);

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index af529bf..fbde635 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -21,16 +21,22 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
 
 public class TaskAttemptFinishedEvent implements HistoryEvent {
@@ -45,14 +51,16 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private String diagnostics;
   private TezCounters tezCounters;
   private TaskAttemptTerminationCause error;
-
+  private List<DataEventDependencyInfo> dataEvents;
+  
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
       long startTime,
       long finishTime,
       TaskAttemptState state,
       TaskAttemptTerminationCause error,
-      String diagnostics, TezCounters counters) {
+      String diagnostics, TezCounters counters, 
+      List<DataEventDependencyInfo> dataEvents) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.startTime = startTime;
@@ -61,6 +69,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
     this.error = error;
+    this.dataEvents = dataEvents;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -80,7 +89,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   public boolean isHistoryEvent() {
     return true;
   }
-
+  
+  public List<DataEventDependencyInfo> getDataEvents() {
+    return dataEvents;
+  }
+  
   public TaskAttemptFinishedProto toProto() {
     TaskAttemptFinishedProto.Builder builder =
         TaskAttemptFinishedProto.newBuilder();
@@ -96,6 +109,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     if (tezCounters != null) {
       builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
     }
+    if (dataEvents != null && !dataEvents.isEmpty()) {
+      for (DataEventDependencyInfo info : dataEvents) {
+        builder.addDataEvents(DataEventDependencyInfo.toProto(info));
+      }
+    }
     return builder.build();
   }
 
@@ -113,6 +131,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
     }
+    if (proto.getDataEventsCount() > 0) {
+      this.dataEvents = Lists.newArrayListWithCapacity(proto.getDataEventsCount());
+      for (DataEventDependencyInfoProto protoEvent : proto.getDataEventsList()) {
+        this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent));
+      }
+    }
   }
 
   @Override
@@ -140,6 +164,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", status=" + state.name()
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
+        + ", lastDataEventSourceTA=" + 
+              ((dataEvents==null) ? 0:dataEvents.size())
         + ", counters=" + (tezCounters == null ? "null" :
           tezCounters.toString()
             .replaceAll("\\n", ", ").replaceAll("\\s+", " "));

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 36add86..4d15fb9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -36,24 +36,30 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   private String inProgressLogsUrl;
   private String completedLogsUrl;
   private String vertexName;
-  private long startTime;
+  private long launchTime;
   private ContainerId containerId;
   private NodeId nodeId;
   private String nodeHttpAddress;
+  private TezTaskAttemptID creationCausalTA;
+  private long creationTime;
+  private long allocationTime;
 
   public TaskAttemptStartedEvent(TezTaskAttemptID taId,
-      String vertexName, long startTime,
+      String vertexName, long launchTime,
       ContainerId containerId, NodeId nodeId,
       String inProgressLogsUrl, String completedLogsUrl,
-      String nodeHttpAddress) {
+      String nodeHttpAddress, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
-    this.startTime = startTime;
+    this.launchTime = launchTime;
     this.containerId = containerId;
     this.nodeId = nodeId;
     this.inProgressLogsUrl = inProgressLogsUrl;
     this.completedLogsUrl = completedLogsUrl;
     this.nodeHttpAddress = nodeHttpAddress;
+    this.creationTime = creationTime;
+    this.creationCausalTA = creationCausalTA;
+    this.allocationTime = allocationTime;
   }
 
   public TaskAttemptStartedEvent() {
@@ -75,19 +81,29 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   public TaskAttemptStartedProto toProto() {
-    return TaskAttemptStartedProto.newBuilder()
-        .setTaskAttemptId(taskAttemptId.toString())
-        .setStartTime(startTime)
+    TaskAttemptStartedProto.Builder builder = TaskAttemptStartedProto.newBuilder();
+    builder.setTaskAttemptId(taskAttemptId.toString())
+        .setStartTime(launchTime)
         .setContainerId(containerId.toString())
         .setNodeId(nodeId.toString())
-        .build();
+        .setCreationTime(creationTime)
+        .setAllocationTime(allocationTime);
+    if (creationCausalTA != null) {
+      builder.setCreationCausalTA(creationCausalTA.toString());
+    }
+    return builder.build();
   }
 
   public void fromProto(TaskAttemptStartedProto proto) {
     this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
-    this.startTime = proto.getStartTime();
+    this.launchTime = proto.getStartTime();
     this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
     this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
+    this.creationTime = proto.getCreationTime();
+    this.allocationTime = proto.getAllocationTime();
+    if (proto.hasCreationCausalTA()) {
+      this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA());
+    }
   }
 
   @Override
@@ -108,7 +124,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   public String toString() {
     return "vertexName=" + vertexName
         + ", taskAttemptId=" + taskAttemptId
-        + ", startTime=" + startTime
+        + ", creationTime=" + creationTime
+        + ", allocationTime=" + allocationTime
+        + ", startTime=" + launchTime
         + ", containerId=" + containerId
         + ", nodeId=" + nodeId
         + ", inProgressLogs=" + inProgressLogsUrl
@@ -120,7 +138,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   public long getStartTime() {
-    return startTime;
+    return launchTime;
+  }
+  
+  public long getCreationTime() {
+    return creationTime;
+  }
+  
+  public long getAllocationTime() {
+    return allocationTime;
+  }
+  
+  public TezTaskAttemptID getCreationCausalTA() {
+    return creationCausalTA;
   }
 
   public ContainerId getContainerId() {

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
index 0310a26..6f44f33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -145,6 +145,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
         if (event.getDestinationInfo() != null) {
           evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
         }
+        evtBuilder.setEventTime(event.getEventReceivedTime());
         tezEventProtos.add(evtBuilder.build());
       }
     }
@@ -184,7 +185,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
       if (eventProto.hasDestinationInfo()) {
         destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
       }
-      TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+      TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
       tezEvent.setDestinationInfo(destinationInfo);
       this.events.add(tezEvent);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 07ce2f3..411d677 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -530,6 +530,10 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getCounters()));
+    if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+      otherInfo.put(ATSConstants.LAST_DATA_EVENTS, 
+          DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents()));
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;
@@ -573,6 +577,11 @@ public class HistoryEventJsonConversion {
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
     otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    otherInfo.put(ATSConstants.CREATION_TIME, event.getCreationTime());
+    otherInfo.put(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+    if (event.getCreationCausalTA() != null) {
+      otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT, event.getCreationCausalTA().toString());
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 1447832..76e592e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -42,6 +43,8 @@ import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
+import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.records.TezTaskID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -99,6 +102,27 @@ public class DAGUtils {
     }
     return dagJson;
   }
+  
+  public static JSONObject convertDataEventDependencyInfoToJSON(List<DataEventDependencyInfo> info) {
+    return new JSONObject(convertDataEventDependecyInfoToATS(info));
+  }
+  
+  public static Map<String, Object> convertDataEventDependecyInfoToATS(List<DataEventDependencyInfo> info) {
+    ArrayList<Object> infoList = new ArrayList<Object>();
+    for (DataEventDependencyInfo event : info) {
+      Map<String, Object> eventObj = new LinkedHashMap<String, Object>();
+      String id = "";
+      if (event.getTaskAttemptId() != null) {
+        id = event.getTaskAttemptId().toString();
+      }
+      eventObj.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), id);
+      eventObj.put(ATSConstants.TIMESTAMP, event.getTimestamp());
+      infoList.add(eventObj);
+    }
+    Map<String,Object> object = new LinkedHashMap<String, Object>();
+    putInto(object, ATSConstants.LAST_DATA_EVENTS, infoList);
+    return object;
+  }
 
   public static JSONObject convertCountersToJSON(TezCounters counters)
       throws JSONException {

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 8af48b6..232f1b7 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -164,6 +164,14 @@ message TaskAttemptStartedProto {
   optional int64 start_time = 2;
   optional string container_id = 3;
   optional string node_id = 4;
+  optional int64 creation_time = 5;
+  optional string creation_causal_t_a = 6;
+  optional int64 allocation_time = 7;
+}
+
+message DataEventDependencyInfoProto {
+  optional string task_attempt_id = 1;
+  optional int64 timestamp = 2;
 }
 
 message TaskAttemptFinishedProto {
@@ -173,6 +181,7 @@ message TaskAttemptFinishedProto {
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
   optional string error_enum = 6;
+  repeated DataEventDependencyInfoProto data_events = 7;
 }
 
 message EventMetaDataProto {
@@ -189,6 +198,7 @@ message TezDataMovementEventProto {
   optional CompositeEventProto composite_data_movement_event = 4;
   optional RootInputDataInformationEventProto root_input_data_information_event = 5;
   optional RootInputInitializerEventProto input_initializer_event = 6;
+  optional int64 event_time = 7;
 }
 
 message VertexDataMovementEventsGeneratedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 08f6ff6..8fa57d3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -114,7 +114,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   }
   
   public static interface EventsDelegate {
-    public void getEvents(TaskSpec taskSpec, List<TezEvent> events);
+    public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
   }
 
   // mock container launcher does not launch real tasks.
@@ -408,7 +408,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
               List<TezEvent> events = Lists.newArrayListWithCapacity(
                                       cData.taskSpec.getOutputs().size() + 1);
               if (cData.numUpdates == 0 && eventsDelegate != null) {
-                eventsDelegate.getEvents(cData.taskSpec, events);
+                eventsDelegate.getEvents(cData.taskSpec, events, getContext().getClock().getTime());
               }
               TezCounters counters = null;
               if (countersDelegate != null) {
@@ -422,7 +422,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
               float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
               float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
               events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
-                  EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+                  EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+                  getContext().getClock().getTime()));
               TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
                   cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
               doHeartbeat(request, cData);
@@ -433,7 +434,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
               cData.completed = true;
               List<TezEvent> events = Collections.singletonList(new TezEvent(
                   new TaskAttemptCompletedEvent(), new EventMetaData(
-                      EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+                      EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+                  getContext().getClock().getTime()));
               TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
                   cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
               doHeartbeat(request, cData);

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 4137d42..42d4b0b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -126,17 +126,17 @@ public class TestMockDAGAppMaster {
   
   static class TestEventsDelegate implements EventsDelegate {
     @Override
-    public void getEvents(TaskSpec taskSpec, List<TezEvent> events) {
+    public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) {
       for (OutputSpec output : taskSpec.getOutputs()) {
         if (output.getPhysicalEdgeCount() == 1) {
           events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
               EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
-                  .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+                  .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
         } else {
           events.add(new TezEvent(CompositeDataMovementEvent.create(0,
               output.getPhysicalEdgeCount(), null), new EventMetaData(
               EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
-                  .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+                  .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
         }
       }
     }    

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 5c24ecc..641f3a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -58,6 +59,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
@@ -99,12 +101,15 @@ public class TestTaskAttemptListenerImplTezDag {
 
     eventHandler = mock(EventHandler.class);
 
+    MockClock clock = new MockClock();
+    
     appContext = mock(AppContext.class);
     doReturn(eventHandler).when(appContext).getEventHandler();
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(appAcls).when(appContext).getApplicationACLs();
     doReturn(amContainerMap).when(appContext).getAllContainers();
-
+    doReturn(clock).when(appContext).getClock();
+    
     taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
         mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
 
@@ -198,6 +203,7 @@ public class TestTaskAttemptListenerImplTezDag {
     final Event statusUpdateEvent = argAllValues.get(0);
     assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
         statusUpdateEvent.getType());
+    assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
 
     final Event vertexEvent = argAllValues.get(1);
     final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
@@ -207,9 +213,39 @@ public class TestTaskAttemptListenerImplTezDag {
         vertexRouteEvent.getEvents().get(0).getEventType());
     assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
         vertexRouteEvent.getEvents().get(1).getEventType());
+  }
+  
+  @Test (timeout = 5000)
+  public void testTaskEventRoutingWithReadError() throws Exception {
+    List<TezEvent> events =  Arrays.asList(
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+      new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
+      new TezEvent(new TaskAttemptCompletedEvent(), null)
+    );
+
+    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    final List<Event> argAllValues = arg.getAllValues();
+
+    final Event statusUpdateEvent = argAllValues.get(0);
+    assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+        statusUpdateEvent.getType());
+    assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
+
+    final Event vertexEvent = argAllValues.get(1);
+    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+        vertexEvent.getType());
+    assertEquals(EventType.INPUT_READ_ERROR_EVENT,
+        vertexRouteEvent.getEvents().get(0).getEventType());
+    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+        vertexRouteEvent.getEvents().get(1).getEventType());
 
   }
 
+
   @Test (timeout = 5000)
   public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
     List<TezEvent> events = Arrays.asList(

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 5d05fa3..1a1cb11 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -587,6 +588,94 @@ public class TestTaskAttempt {
   }
   
   @Test(timeout = 5000)
+  public void testLastDataEventRecording() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    
+    long ts1 = 1024;
+    long ts2 = 2048;
+    TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class);
+    TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+    when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1);
+    when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1);
+    TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS);
+    when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2);
+    when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2);
+    TaskAttemptEventStatusUpdate statusEvent =
+        new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null));
+
+    assertEquals(0, taImpl.lastDataEvents.size());
+    taImpl.setLastEventSent(mockTezEvent1);
+    assertEquals(1, taImpl.lastDataEvents.size());
+    assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp());
+    assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId());
+    taImpl.handle(statusEvent);
+    taImpl.setLastEventSent(mockTezEvent2);
+    assertEquals(1, taImpl.lastDataEvents.size());
+    assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp());
+    assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value
+    statusEvent.setReadErrorReported(true);
+    taImpl.handle(statusEvent);
+    taImpl.setLastEventSent(mockTezEvent1);
+    assertEquals(2, taImpl.lastDataEvents.size());
+    assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp());
+    assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event
+    taImpl.setLastEventSent(mockTezEvent2);
+    assertEquals(2, taImpl.lastDataEvents.size());
+    assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp());
+    assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value
+  }
+  
+  @Test(timeout = 5000)
   public void testFailure() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 0665b1e..90274eb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -52,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -66,12 +66,16 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.common.collect.Lists;
+
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttemptRecovery {
 
   private TaskAttemptImpl ta;
   private EventHandler mockEventHandler;
-  private long startTime = System.currentTimeMillis();
+  private long creationTime = System.currentTimeMillis();
+  private long allocationTime = creationTime + 5000;
+  private long startTime = allocationTime + 5000;
   private long finishTime = startTime + 5000;
 
   private TezTaskAttemptID taId;
@@ -153,9 +157,14 @@ public class TestTaskAttemptRecovery {
   }
 
   private void restoreFromTAStartEvent() {
+    TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1);
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", creationTime, causalId, 
+            allocationTime));
+    assertEquals(causalId, ta.getCreationCausalAttempt());
+    assertEquals(creationTime, ta.getCreationTime());
+    assertEquals(allocationTime, ta.getAllocationTime());
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(TaskAttemptState.RUNNING, recoveredState);
   }
@@ -169,9 +178,14 @@ public class TestTaskAttemptRecovery {
       errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
     }
 
+    long lastDataEventTime = 1024;
+    TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
+    List<DataEventDependencyInfo> events = Lists.newLinkedList();
+    events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
+    events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, state, errorEnum, diag, counters));
+            startTime, finishTime, state, errorEnum, diag, counters, events));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(finishTime, ta.getFinishTime());
     assertEquals(counters, ta.reportedStatus.counters);
@@ -180,6 +194,9 @@ public class TestTaskAttemptRecovery {
     assertEquals(1, ta.getDiagnostics().size());
     assertEquals(diag, ta.getDiagnostics().get(0));
     assertEquals(state, recoveredState);
+    assertEquals(events.size(), ta.lastDataEvents.size());
+    assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp());
+    assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId());
     if (state != TaskAttemptState.SUCCEEDED) {
       assertEquals(errorEnum, ta.getTerminationCause());
     } else {
@@ -304,7 +321,7 @@ public class TestTaskAttemptRecovery {
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             startTime, finishTime, TaskAttemptState.KILLED,
-            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
+            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null));
     assertEquals(TaskAttemptState.KILLED, recoveredState);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 87dd2fa..1ee6c4e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -72,6 +73,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -359,7 +361,10 @@ public class TestTaskImpl {
     LOG.info("--- START: testKillScheduledTaskAttempt ---");
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     killScheduledTaskAttempt(mockTask.getLastAttempt().getID());
+    // last killed attempt should be causal TA of next attempt
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
   }
 
   @Test(timeout = 5000)
@@ -383,8 +388,11 @@ public class TestTaskImpl {
     LOG.info("--- START: testKillRunningTaskAttempt ---");
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+    // last killed attempt should be causal TA of next attempt
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
   }
 
   /**
@@ -505,11 +513,15 @@ public class TestTaskImpl {
 
     // During the task attempt commit there is an exception which causes
     // the attempt to fail
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
+    assertEquals(1, mockTask.getAttemptList().size());
     failRunningTaskAttempt(mockTask.getLastAttempt().getID());
 
     assertEquals(2, mockTask.getAttemptList().size());
     assertEquals(1, mockTask.failedAttempts);
+    // last failed attempt should be the causal TA
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
 
     assertFalse("First attempt should not commit",
         mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
@@ -553,6 +565,7 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     
     // Add a speculative task attempt that succeeds
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
@@ -560,6 +573,11 @@ public class TestTaskImpl {
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
     
+    assertEquals(2, mockTask.getAttemptList().size());
+    
+    // previous running attempt should be the casual TA of this speculative attempt
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
+    
     assertTrue("Second attempt should commit",
         mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
     assertFalse("First attempt should not commit",
@@ -602,8 +620,14 @@ public class TestTaskImpl {
 
     eventHandler.events.clear();
     // Now fail the attempt after it has succeeded
+    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+    TezEvent mockTezEvent = mock(TezEvent.class);
+    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
+    when(mockTezEvent.getSourceInfo()).thenReturn(meta);
+    TaskAttemptEventOutputFailed outputFailedEvent = 
+        new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
-        .getID(), TaskEventType.T_ATTEMPT_FAILED));
+        .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
 
     // The task should still be in the scheduled state
     assertTaskScheduledState();
@@ -611,6 +635,12 @@ public class TestTaskImpl {
     Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
     event = eventHandler.events.get(eventHandler.events.size()-1);
     Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+    
+    // report of output read error should be the causal TA
+    List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
+    Assert.assertEquals(2, attempts.size());
+    MockTaskAttemptImpl newAttempt = attempts.get(1);
+    Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
   }
 
   @Test(timeout = 5000)
@@ -695,11 +725,11 @@ public class TestTaskImpl {
     }
 
     @Override
-    protected TaskAttemptImpl createAttempt(int attemptNumber) {
+    protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
           attemptNumber, eventHandler, taskAttemptListener,
           conf, clock, taskHeartbeatHandler, appContext,
-          true, taskResource, containerContext);
+          true, taskResource, containerContext, schedCausalTA);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -746,9 +776,10 @@ public class TestTaskImpl {
         EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
         boolean isRescheduled,
-        Resource resource, ContainerContext containerContext) {
+        Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
       super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
+          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
+          schedCausalTA);
     }
     
     @Override