You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/04 19:44:38 UTC

tez git commit: TEZ-2647. Add input causality dependency for attempts (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 73da831e8 -> 6a99798f2


TEZ-2647. Add input causality dependency for attempts (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6a99798f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6a99798f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6a99798f

Branch: refs/heads/master
Commit: 6a99798f250310f8bd819aae89123e6146364983
Parents: 73da831
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Aug 4 10:42:40 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Aug 4 10:43:09 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  2 ++
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  5 ++-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |  3 ++
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 13 +++++---
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 21 ++++++++++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 17 +++++++++-
 .../tez/dag/app/dag/impl/VertexManager.java     |  3 +-
 .../events/TaskAttemptFinishedEvent.java        | 27 +++++++++++++++-
 .../VertexRecoverableEventsGeneratedEvent.java  |  3 +-
 .../impl/HistoryEventJsonConversion.java        |  4 +++
 tez-dag/src/main/proto/HistoryEvents.proto      |  3 ++
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 10 +++---
 .../tez/dag/app/TestMockDAGAppMaster.java       |  6 ++--
 .../app/TestTaskAttemptListenerImplTezDag.java  |  5 ++-
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  8 +++--
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 34 ++++++++++----------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  6 +++-
 .../TestHistoryEventsProtoConversion.java       | 13 ++++++--
 .../impl/TestHistoryEventJsonConversion.java    |  2 +-
 .../parser/datamodel/TaskAttemptInfo.java       | 15 +++++++++
 .../apache/tez/history/TestATSFileParser.java   | 24 +++++++++++++-
 .../ats/HistoryEventTimelineConversion.java     |  6 +++-
 .../ats/TestHistoryEventTimelineConversion.java |  9 ++++--
 .../apache/tez/runtime/api/impl/TezEvent.java   | 17 ++++++++++
 25 files changed, 207 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59307b7..1f85954 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead.
   TEZ-2468. Change the minimum Java version to Java 7.
   TEZ-2646. Add scheduling casual dependency for attempts
+  TEZ-2647. Add input causality dependency for attempts
 
 ALL CHANGES:
   TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 4bf9f6d..1568b96 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -81,6 +81,8 @@ public class ATSConstants {
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";
   public static final String EXIT_STATUS = "exitStatus";
+  public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime";
+  public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA";
   public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
   public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt";
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index fe92f3a..b7896c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.slf4j.Logger;
@@ -424,6 +423,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
               " events: " + (inEvents != null? inEvents.size() : -1));
         }
 
+        long currTime = context.getClock().getTime();
         List<TezEvent> otherEvents = new ArrayList<TezEvent>();
         // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
         // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
@@ -431,6 +431,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
         //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
         for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+          // for now, set the event time on the AM when it is received.
+          // this avoids any time disparity between machines.
+          tezEvent.setEventReceivedTime(currTime);
           final EventType eventType = tezEvent.getEventType();
           if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
             TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 6c85cc2..4360cc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
 
 /**
  * Read only view of TaskAttempt.
@@ -79,6 +80,8 @@ public interface TaskAttempt {
   float getProgress();
   TaskAttemptState getState();
   TaskAttemptState getStateNoLock();
+  
+  void setLastEventSent(TezEvent lastEventSent);
 
   /** 
    * Has attempt reached the final state or not.

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index ddccf8d..da74a46 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -377,7 +377,7 @@ public class Edge {
     EventMetaData srcInfo = tezEvent.getSourceInfo();
     
     for (DataMovementEvent dmEvent : compEvent.getEvents()) {
-      TezEvent newEvent = new TezEvent(dmEvent, srcInfo);
+      TezEvent newEvent = new TezEvent(dmEvent, srcInfo, tezEvent.getEventReceivedTime());
       sendTezEventToDestinationTasks(newEvent);
     }
   }
@@ -406,7 +406,7 @@ public class Edge {
             InputFailedEvent ifEvent = ((InputFailedEvent) event);
             e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
           }
-          tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+          tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
           tezEventToSend.setDestinationInfo(destinationMetaInfo);
           // cache the event object per input because are unique per input index
           inputIndicesWithEvents.put(inputIndex, tezEventToSend);
@@ -553,7 +553,8 @@ public class Edge {
                 DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
                     targetIndices[numEventsDone]);
                 numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+                    tezEvent.getEventReceivedTime());
                 tezEventToSend.setDestinationInfo(destinationMetaInfo);
                 listToAdd.add(tezEventToSend);
               }
@@ -585,7 +586,8 @@ public class Edge {
               while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                 InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
                 numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+                    tezEvent.getEventReceivedTime());
                 tezEventToSend.setDestinationInfo(destinationMetaInfo);
                 listToAdd.add(tezEventToSend);
               }
@@ -617,7 +619,8 @@ public class Edge {
               while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                 DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
                 numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
+                    tezEvent.getEventReceivedTime());
                 tezEventToSend.setDestinationInfo(destinationMetaInfo);
                 listToAdd.add(tezEventToSend);
               }

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 40636dd..ebf7c58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -147,6 +147,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Vertex vertex;
 
   @VisibleForTesting
+  long lastDataEventTime;
+  TezTaskAttemptID lastDataEventSourceTA = null;
+  
+  @VisibleForTesting
   TaskAttemptStatus reportedStatus;
   private DAGCounter localityCounter;
   
@@ -754,6 +758,8 @@ public class TaskAttemptImpl implements TaskAttempt,
               : TaskAttemptTerminationCause.UNKNOWN_ERROR;
           this.diagnostics.add(tEvent.getDiagnostics());
           this.recoveredState = tEvent.getState();
+          this.lastDataEventTime = tEvent.getLastDataEventTime();
+          this.lastDataEventSourceTA = tEvent.getLastDataEventSourceTA();
           sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
           return recoveredState;
         }
@@ -969,7 +975,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, null,
-        "", getCounters());
+        "", getCounters(), lastDataEventTime, lastDataEventSourceTA);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -982,7 +988,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         clock.getTime(), state,
         terminationCause,
         StringUtils.join(
-            getDiagnostics(), LINE_SEPARATOR), getCounters());
+            getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime, 
+        lastDataEventSourceTA);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1485,7 +1492,7 @@ public class TaskAttemptImpl implements TaskAttempt,
             new EventMetaData(EventProducerConsumerType.SYSTEM, 
                 vertex.getName(), 
                 edgeVertex.getName(), 
-                getID())));
+                getID()), appContext.getClock().getTime()));
       }
       sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
     }
@@ -1562,4 +1569,12 @@ public class TaskAttemptImpl implements TaskAttempt,
   public String toString() {
     return getID().toString();
   }
+
+
+  @Override
+  public void setLastEventSent(TezEvent lastEventSent) {
+    // task attempt id may be null for input data information events
+    this.lastDataEventSourceTA = lastEventSent.getSourceInfo().getTaskAttemptID();
+    this.lastDataEventTime = lastEventSent.getEventReceivedTime();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3888c7a..9519fa9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4092,7 +4092,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   @Override
   public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
       int fromEventId, int preRoutedFromEventId, int maxEvents) {
-    ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(
+    Task task = getTask(attemptID.getTaskID());
+    ArrayList<TezEvent> events = task.getTaskAttemptTezEvents(
         attemptID, preRoutedFromEventId, maxEvents);
     int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
     int nextFromEventId = fromEventId;
@@ -4192,6 +4193,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     } finally {
       onDemandRouteEventsReadLock.unlock();
     }
+    if (!events.isEmpty()) {
+      for (int i=(events.size() - 1); i>=0; --i) {
+        TezEvent lastEvent = events.get(i);
+              // record the last event sent by the AM to the task
+        EventType lastEventType = lastEvent.getEventType();
+        // if the following changes then critical path logic/recording may need revision
+        if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT ||
+            lastEventType == EventType.DATA_MOVEMENT_EVENT ||
+            lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+          task.getAttempt(attemptID).setLastEventSent(lastEvent);
+          break;
+        }
+      }
+    }
     return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index caa3432..64eb80f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -230,11 +230,12 @@ public class VertexManager {
         Collection<InputDataInformationEvent> events) {
       checkAndThrowIfDone();
       verifyIsRootInput(inputName);
+      final long currTime = appContext.getClock().getTime();
       Collection<TezEvent> tezEvents = Collections2.transform(events,
           new Function<InputDataInformationEvent, TezEvent>() {
             @Override
             public TezEvent apply(InputDataInformationEvent riEvent) {
-              TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata);
+              TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime);
               tezEvent.setDestinationInfo(getDestinationMetaData(inputName));
               return tezEvent;
             }

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index af529bf..52761e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -45,6 +45,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private String diagnostics;
   private TezCounters tezCounters;
   private TaskAttemptTerminationCause error;
+  private TezTaskAttemptID lastDataEventSourceTA;
+  private long lastDataEventTime;
 
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
@@ -52,7 +54,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       long finishTime,
       TaskAttemptState state,
       TaskAttemptTerminationCause error,
-      String diagnostics, TezCounters counters) {
+      String diagnostics, TezCounters counters, 
+      long lastDataEventTime, 
+      TezTaskAttemptID lastDataEventSourceTA) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.startTime = startTime;
@@ -61,6 +65,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
     this.error = error;
+    this.lastDataEventTime = lastDataEventTime;
+    this.lastDataEventSourceTA = lastDataEventSourceTA;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -80,6 +86,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   public boolean isHistoryEvent() {
     return true;
   }
+  
+  public long getLastDataEventTime() {
+    return lastDataEventTime;
+  }
+  
+  public TezTaskAttemptID getLastDataEventSourceTA() {
+    return lastDataEventSourceTA;
+  }
 
   public TaskAttemptFinishedProto toProto() {
     TaskAttemptFinishedProto.Builder builder =
@@ -96,6 +110,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     if (tezCounters != null) {
       builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
     }
+    if (lastDataEventSourceTA != null) {
+      builder.setLastDataEventSourceTA(lastDataEventSourceTA.toString());
+      builder.setLastDataEventTime(lastDataEventTime);
+    }
     return builder.build();
   }
 
@@ -113,6 +131,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
     }
+    if (proto.hasLastDataEventSourceTA()) {
+      this.lastDataEventSourceTA = TezTaskAttemptID.fromString(proto.getLastDataEventSourceTA());
+      this.lastDataEventTime = proto.getLastDataEventTime();
+    }
   }
 
   @Override
@@ -140,6 +162,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", status=" + state.name()
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
+        + ", lastDataEventSourceTA=" + 
+              ((lastDataEventSourceTA==null) ? null:lastDataEventSourceTA.toString())
+        + ", lastDataEventTime=" + lastDataEventTime
         + ", counters=" + (tezCounters == null ? "null" :
           tezCounters.toString()
             .replaceAll("\\n", ", ").replaceAll("\\s+", " "));

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
index 0310a26..6f44f33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
@@ -145,6 +145,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
         if (event.getDestinationInfo() != null) {
           evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
         }
+        evtBuilder.setEventTime(event.getEventReceivedTime());
         tezEventProtos.add(evtBuilder.build());
       }
     }
@@ -184,7 +185,7 @@ public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
       if (eventProto.hasDestinationInfo()) {
         destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
       }
-      TezEvent tezEvent = new TezEvent(evt, sourceInfo);
+      TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
       tezEvent.setDestinationInfo(destinationInfo);
       this.events.add(tezEvent);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 3fdfe0a..528da10 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -530,6 +530,10 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getCounters()));
+    otherInfo.put(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
+    if (event.getLastDataEventSourceTA() != null) {
+      otherInfo.put(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, event.getLastDataEventSourceTA().toString());
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 402349b..ffb382e 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -175,6 +175,8 @@ message TaskAttemptFinishedProto {
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
   optional string error_enum = 6;
+  optional int64 last_data_event_time = 7;
+  optional string last_data_event_source_t_a = 8;
 }
 
 message EventMetaDataProto {
@@ -191,6 +193,7 @@ message TezDataMovementEventProto {
   optional CompositeEventProto composite_data_movement_event = 4;
   optional RootInputDataInformationEventProto root_input_data_information_event = 5;
   optional RootInputInitializerEventProto input_initializer_event = 6;
+  optional int64 event_time = 7;
 }
 
 message VertexDataMovementEventsGeneratedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 08f6ff6..8fa57d3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -114,7 +114,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   }
   
   public static interface EventsDelegate {
-    public void getEvents(TaskSpec taskSpec, List<TezEvent> events);
+    public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
   }
 
   // mock container launcher does not launch real tasks.
@@ -408,7 +408,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
               List<TezEvent> events = Lists.newArrayListWithCapacity(
                                       cData.taskSpec.getOutputs().size() + 1);
               if (cData.numUpdates == 0 && eventsDelegate != null) {
-                eventsDelegate.getEvents(cData.taskSpec, events);
+                eventsDelegate.getEvents(cData.taskSpec, events, getContext().getClock().getTime());
               }
               TezCounters counters = null;
               if (countersDelegate != null) {
@@ -422,7 +422,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
               float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
               float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
               events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
-                  EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+                  EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+                  getContext().getClock().getTime()));
               TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
                   cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
               doHeartbeat(request, cData);
@@ -433,7 +434,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
               cData.completed = true;
               List<TezEvent> events = Collections.singletonList(new TezEvent(
                   new TaskAttemptCompletedEvent(), new EventMetaData(
-                      EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
+                      EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
+                  getContext().getClock().getTime()));
               TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
                   cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
               doHeartbeat(request, cData);

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 4137d42..42d4b0b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -126,17 +126,17 @@ public class TestMockDAGAppMaster {
   
   static class TestEventsDelegate implements EventsDelegate {
     @Override
-    public void getEvents(TaskSpec taskSpec, List<TezEvent> events) {
+    public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) {
       for (OutputSpec output : taskSpec.getOutputs()) {
         if (output.getPhysicalEdgeCount() == 1) {
           events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
               EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
-                  .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+                  .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
         } else {
           events.add(new TezEvent(CompositeDataMovementEvent.create(0,
               output.getPhysicalEdgeCount(), null), new EventMetaData(
               EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
-                  .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+                  .getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
         }
       }
     }    

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 5c24ecc..d8a7388 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -99,12 +99,15 @@ public class TestTaskAttemptListenerImplTezDag {
 
     eventHandler = mock(EventHandler.class);
 
+    MockClock clock = new MockClock();
+    
     appContext = mock(AppContext.class);
     doReturn(eventHandler).when(appContext).getEventHandler();
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(appAcls).when(appContext).getApplicationACLs();
     doReturn(amContainerMap).when(appContext).getAllContainers();
-
+    doReturn(clock).when(appContext).getClock();
+    
     taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
         mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index d632aa3..920109b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -169,9 +169,11 @@ public class TestTaskAttemptRecovery {
       errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
     }
 
+    long lastDataEventTime = 1024;
+    TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, state, errorEnum, diag, counters));
+            startTime, finishTime, state, errorEnum, diag, counters, lastDataEventTime, lastDataEventTA));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(finishTime, ta.getFinishTime());
     assertEquals(counters, ta.reportedStatus.counters);
@@ -180,6 +182,8 @@ public class TestTaskAttemptRecovery {
     assertEquals(1, ta.getDiagnostics().size());
     assertEquals(diag, ta.getDiagnostics().get(0));
     assertEquals(state, recoveredState);
+    assertEquals(lastDataEventTime, ta.lastDataEventTime);
+    assertEquals(lastDataEventTA, ta.lastDataEventSourceTA);
     if (state != TaskAttemptState.SUCCEEDED) {
       assertEquals(errorEnum, ta.getTerminationCause());
     } else {
@@ -304,7 +308,7 @@ public class TestTaskAttemptRecovery {
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             startTime, finishTime, TaskAttemptState.KILLED,
-            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
+            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), 0, null));
     assertEquals(TaskAttemptState.KILLED, recoveredState);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index feb290f..87e7498 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -286,7 +286,7 @@ public class TestTaskRecovery {
     restoreFromTaskStartEvent();
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters()));
+        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), 0, null));
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
     // wait for the second task attempt is scheduled
     dispatcher.await();
@@ -307,7 +307,7 @@ public class TestTaskRecovery {
     restoreFromTaskStartEvent();
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters()));
+        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), 0, null));
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
     // wait for the second task attempt is scheduled
     dispatcher.await();
@@ -329,7 +329,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     try {
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters()));
+          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), 0, null));
       fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
     } catch (TezUncheckedException e) {
       assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
@@ -372,7 +372,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -405,7 +405,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -438,7 +438,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -473,7 +473,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -516,7 +516,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -528,7 +528,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -563,7 +563,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -575,7 +575,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -614,7 +614,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -654,7 +654,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -735,7 +735,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -776,7 +776,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.KILLED, null, "", null));
+          0, TaskAttemptState.KILLED, null, "", null, 0, null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(0, task.failedAttempts);
@@ -806,7 +806,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null));
+          0, TaskAttemptState.FAILED, null, "", null, 0, null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -836,7 +836,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null));
+          0, TaskAttemptState.FAILED, null, "", null, 0, null));
     }
     assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
     assertEquals(maxFailedAttempts - 1, task.failedAttempts);

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 98ef973..c55ea23 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -110,6 +110,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.MockClock;
 import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -2179,6 +2180,8 @@ public class TestVertexImpl {
       }})
     .when(execService).submit((Callable<Void>) any());
     
+    MockClock clock = new MockClock();
+    
     doReturn(execService).when(appContext).getExecService();
     doReturn(conf).when(appContext).getAMConf();
     doReturn(new Credentials()).when(dag).getCredentials();
@@ -2189,7 +2192,8 @@ public class TestVertexImpl {
     doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
-
+    doReturn(clock).when(appContext).getClock();
+    
     vertexGroups = Maps.newHashMap();
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 9be3531..a32cc27 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -505,7 +505,9 @@ public class TestHistoryEventsProtoConversion {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          null, null, null);
+          null, null, null, 1024, 
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 110), 1), 1));
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -518,6 +520,8 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getState());
       Assert.assertEquals(event.getCounters(),
           deserializedEvent.getCounters());
+      Assert.assertEquals(event.getLastDataEventTime(), deserializedEvent.getLastDataEventTime());
+      Assert.assertEquals(event.getLastDataEventSourceTA(), deserializedEvent.getLastDataEventSourceTA());
       logEvents(event, deserializedEvent);
     }
     {
@@ -525,7 +529,7 @@ public class TestHistoryEventsProtoConversion {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters());
+          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), 0, null);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -588,9 +592,10 @@ public class TestHistoryEventsProtoConversion {
     } catch (RuntimeException e) {
       // Expected
     }
+    long eventTime = 1024;
     List<TezEvent> events =
         Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
-            new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+            new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null), eventTime));
     event = new VertexRecoverableEventsGeneratedEvent(
             TezVertexID.getInstance(
                 TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
@@ -601,6 +606,8 @@ public class TestHistoryEventsProtoConversion {
         deserializedEvent.getTezEvents().size());
     Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
         deserializedEvent.getTezEvents().get(0).getEventType());
+    Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(),
+        deserializedEvent.getTezEvents().get(0).getEventReceivedTime());
     logEvents(event, deserializedEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index db871a2..ec1603e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null);
+              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, 0, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index cca984a..b412c46 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -20,6 +20,8 @@ package org.apache.tez.history.parser.datamodel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -46,6 +48,9 @@ public class TaskAttemptInfo extends BaseInfo {
   private final String status;
   private final String logUrl;
   private final String schedulingCausalTA;
+  private final long lastDataEventTime;
+  private final String lastDataEventSourceTA;
+
   private TaskInfo taskInfo;
 
   private Container container;
@@ -75,6 +80,8 @@ public class TaskAttemptInfo extends BaseInfo {
 
     status = otherInfoNode.optString(Constants.STATUS);
     container = new Container(containerId, nodeId);
+    lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME);
+    lastDataEventSourceTA = otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA);
   }
 
   void setTaskInfo(TaskInfo taskInfo) {
@@ -104,6 +111,14 @@ public class TaskAttemptInfo extends BaseInfo {
   public final long getAbsoluteScheduledTime() {
     return scheduledTime;
   }
+  
+  public final long getLastDataEventTime() {
+    return lastDataEventTime;
+  }
+  
+  public final String getLastDataEventSourceTA() {
+    return lastDataEventSourceTA;
+  }
 
   public final long getTimeTaken() {
     return getFinishTime() - getStartTime();

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
index 1d59e98..d205056 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
@@ -80,6 +80,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
@@ -105,7 +106,6 @@ public class TestATSFileParser {
       "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
   private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
 
-  private static String timelineAddress;
   private static TezClient tezClient;
 
   private static int dagNumber;
@@ -228,8 +228,11 @@ public class TestATSFileParser {
     assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
     assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
     assertTrue(dagInfo.getVertices().size() == 2);
+    String lastSourceTA = null;
+    String lastDataEventSourceTA = null;
     for (VertexInfo vertexInfo : dagInfo.getVertices()) {
       assertTrue(vertexInfo.getKilledTasksCount() == 0);
+      long finishTime = 0;
       for (TaskInfo taskInfo : vertexInfo.getTasks()) {
         assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
         assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
@@ -240,6 +243,24 @@ public class TestATSFileParser {
         assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
         assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
         assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+        List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
+        if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+          // get the last task to finish and track its successful attempt
+          if (finishTime < taskInfo.getAbsFinishTime()) {
+            finishTime = taskInfo.getAbsFinishTime();
+            lastSourceTA = taskInfo.getSuccessfulAttemptId();
+          }
+        } else {
+          for (TaskAttemptInfo attempt : attempts) {
+            assertTrue(attempt.getLastDataEventTime() > 0);
+            if (lastDataEventSourceTA == null) {
+              lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+            } else {
+              // all attempts should have the same last data event source TA
+              assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+            }
+          }
+        }
         for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
           assertTrue(attemptInfo.getStartTime() > 0);
           assertTrue(attemptInfo.getScheduledTime() > 0);
@@ -258,6 +279,7 @@ public class TestATSFileParser {
         assertTrue(vertexInfo.getInputVertices().size() == 1);
       }
     }
+    assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 95f77e2..eaed115 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -432,7 +432,11 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToATSMap(event.getCounters()));
-
+    atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime());
+    if (event.getLastDataEventSourceTA() != null) {
+      atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_SOURCE_TA,
+          event.getLastDataEventSourceTA().toString());
+    }
     return atsEntity;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index bf8d0ec..838d9d6 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -169,7 +169,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, 0, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -450,9 +450,10 @@ public class TestHistoryEventTimelineConversion {
         .values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
     String diagnostics = "random diagnostics message";
     TezCounters counters = new TezCounters();
+    long lastDataEventTime = finishTime - 1;
 
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
-        startTime, finishTime, state, error, diagnostics, counters);
+        startTime, finishTime, state, error, diagnostics, counters, lastDataEventTime, tezTaskAttemptID);
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -475,12 +476,14 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, evt.getTimestamp());
 
     final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
-    Assert.assertEquals(6, otherInfo.size());
+    Assert.assertEquals(8, otherInfo.size());
     Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
     Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
     Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
     Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
     Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
+    Assert.assertEquals(lastDataEventTime, otherInfo.get(ATSConstants.LAST_DATA_EVENT_TIME));
+    Assert.assertEquals(tezTaskAttemptID.toString(), otherInfo.get(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
     Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a99798f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 974e190..b44b7d4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -55,12 +55,19 @@ public class TezEvent implements Writable {
   private EventMetaData sourceInfo;
 
   private EventMetaData destinationInfo;
+  
+  private long eventReceivedTime;
 
   public TezEvent() {
   }
 
   public TezEvent(Event event, EventMetaData sourceInfo) {
+    this(event, sourceInfo, System.currentTimeMillis());
+  }
+  
+  public TezEvent(Event event, EventMetaData sourceInfo, long time) {
     this.event = event;
+    this.eventReceivedTime = time;
     this.setSourceInfo(sourceInfo);
     if (event instanceof DataMovementEvent) {
       eventType = EventType.DATA_MOVEMENT_EVENT;
@@ -91,6 +98,14 @@ public class TezEvent implements Writable {
   public Event getEvent() {
     return event;
   }
+  
+  public void setEventReceivedTime(long eventReceivedTime) { // TODO save
+    this.eventReceivedTime = eventReceivedTime;
+  }
+  
+  public long getEventReceivedTime() {
+    return eventReceivedTime;
+  }
 
   public EventMetaData getSourceInfo() {
     return sourceInfo;
@@ -119,6 +134,7 @@ public class TezEvent implements Writable {
     }
     out.writeBoolean(true);
     out.writeInt(eventType.ordinal());
+    out.writeLong(eventReceivedTime);
     if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
       // TODO NEWTEZ convert to PB
       TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
@@ -188,6 +204,7 @@ public class TezEvent implements Writable {
       return;
     }
     eventType = EventType.values()[in.readInt()];
+    eventReceivedTime = in.readLong();
     if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
       // TODO NEWTEZ convert to PB
       event = new TaskStatusUpdateEvent();