You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/14 02:54:02 UTC

tez git commit: TEZ-2701. Add time at which container was allocated to attempt (bikas)

Repository: tez
Updated Branches:
  refs/heads/master b8e8bcbd0 -> 6b67b0bc1


TEZ-2701. Add time at which container was allocated to attempt (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6b67b0bc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6b67b0bc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6b67b0bc

Branch: refs/heads/master
Commit: 6b67b0bc1eb010f6dc8af2936ae738909e1244ff
Parents: b8e8bcb
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Aug 13 17:53:52 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Aug 13 17:53:52 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 +-
 .../org/apache/tez/common/ATSConstants.java     |  4 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 52 ++++++++++++++++----
 .../tez/dag/app/rm/container/AMContainer.java   |  1 +
 .../dag/app/rm/container/AMContainerImpl.java   | 12 +++++
 .../history/events/TaskAttemptStartedEvent.java | 41 +++++++++------
 .../impl/HistoryEventJsonConversion.java        |  7 +--
 tez-dag/src/main/proto/HistoryEvents.proto      |  5 +-
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 12 +++--
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 12 ++---
 .../dag/app/rm/container/TestAMContainer.java   |  5 +-
 .../TestHistoryEventsProtoConversion.java       | 12 +++--
 .../impl/TestHistoryEventJsonConversion.java    |  2 +-
 .../parser/datamodel/TaskAttemptInfo.java       | 37 +++++++++-----
 .../apache/tez/history/TestHistoryParser.java   | 17 ++++---
 .../ats/HistoryEventTimelineConversion.java     |  9 ++--
 .../ats/TestHistoryEventTimelineConversion.java | 12 +++--
 17 files changed, 167 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 432c82b..bbe9321 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,14 +8,15 @@ INCOMPATIBLE CHANGES
   TEZ-2048. Remove VertexManagerPluginContext.getTaskContainer()
   TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead.
   TEZ-2468. Change the minimum Java version to Java 7.
+
+ALL CHANGES:
   TEZ-2646. Add scheduling casual dependency for attempts
   TEZ-2647. Add input causality dependency for attempts
   TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
   instead of tasks
   TEZ-2650. Timing details on Vertex state changes
   TEZ-2699. Internalize strings in ATF parser
-
-ALL CHANGES:
+  TEZ-2701. Add time at which container was allocated to attempt
   TEZ-2683. TestHttpConnection::testAsyncHttpConnectionInterrupt fails in certain environments.
   TEZ-2692. bugfixes & enhancements related to job parser and analyzer.
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 1568b96..4566a91 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -57,6 +57,8 @@ public class ATSConstants {
   public static final String VERTEX_NAME = "vertexName";
   public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
   public static final String SCHEDULED_TIME = "scheduledTime";
+  public static final String CREATION_TIME = "creationTime";
+  public static final String ALLOCATION_TIME = "allocationTime";
   public static final String INIT_REQUESTED_TIME = "initRequestedTime";
   public static final String INIT_TIME = "initTime";
   public static final String START_REQUESTED_TIME = "startRequestedTime";
@@ -84,7 +86,7 @@ public class ATSConstants {
   public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime";
   public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA";
   public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
-  public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt";
+  public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
 
   /* Counters-related keys */
   public static final String COUNTER_GROUPS = "counterGroups";

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ebf7c58..e5a6f84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -86,6 +86,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -138,6 +139,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   // TODO Can these be replaced by the container object TEZ-1037
   private Container container;
+  private long allocationTime;
   private ContainerId containerId;
   private NodeId containerNodeId;
   private String nodeHttpAddress;
@@ -170,8 +172,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final ContainerContext containerContext;
   private final boolean leafVertex;
   
-  private TezTaskAttemptID schedulingCausalTA;
-  private long scheduledTime;
+  private TezTaskAttemptID creationCausalTA;
+  private long creationTime;
 
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
@@ -411,8 +413,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.appContext = appContext;
     this.task = task;
     this.vertex = this.task.getVertex();
-    this.schedulingCausalTA = schedulingCausalTA;
-    this.scheduledTime = clock.getTime();
+    this.creationCausalTA = schedulingCausalTA;
+    this.creationTime = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
@@ -446,7 +448,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
   
   public TezTaskAttemptID getSchedulingCausalTA() {
-    return schedulingCausalTA;
+    return creationCausalTA;
   }
 
   TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
@@ -646,6 +648,33 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
+  public long getCreationTime() {
+    readLock.lock();
+    try {
+      return creationTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public TezTaskAttemptID getCreationCausalAttempt() {
+    readLock.lock();
+    try {
+      return creationCausalTA;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public long getAllocationTime() {
+    readLock.lock();
+    try {
+      return allocationTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public long getFinishTime() {
     readLock.lock();
@@ -739,8 +768,9 @@ public class TaskAttemptImpl implements TaskAttempt,
         {
           TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
           this.launchTime = tEvent.getStartTime();
-          this.scheduledTime = tEvent.getScheduledTime();
-          this.schedulingCausalTA = tEvent.getSchedulingCausalTA();
+          this.creationTime = tEvent.getCreationTime();
+          this.allocationTime = tEvent.getAllocationTime();
+          this.creationCausalTA = tEvent.getCreationCausalTA();
           recoveryStartEventSeen = true;
           recoveredState = TaskAttemptState.RUNNING;
           this.containerId = tEvent.getContainerId();
@@ -963,7 +993,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getVertex().getName(),
         launchTime, containerId, containerNodeId,
-        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, scheduledTime, schedulingCausalTA);
+        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, creationTime, creationCausalTA, 
+        allocationTime);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), startEvt));
   }
@@ -1114,9 +1145,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
       TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
 
-      Container container = ta.appContext.getAllContainers()
-          .get(event.getContainerId()).getContainer();
+      AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); 
+      Container container = amContainer.getContainer();
 
+      ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
       ta.container = container;
       ta.containerId = event.getContainerId();
       ta.containerNodeId = container.getNodeId();

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index a6b403d..7d6da8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -32,5 +32,6 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
   public Container getContainer();
   public List<TezTaskAttemptID> getAllTaskAttempts();
   public TezTaskAttemptID getCurrentTaskAttempt();
+  public long getCurrentTaskAttemptAllocationTime();
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 330f2b7..9b90752 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -95,6 +95,7 @@ public class AMContainerImpl implements AMContainer {
   private boolean nodeFailed = false;
 
   private TezTaskAttemptID currentAttempt;
+  private long currentAttemptAllocationTime;
   private List<TezTaskAttemptID> failedAssignments;
 
   private boolean inError = false;
@@ -362,6 +363,16 @@ public class AMContainerImpl implements AMContainer {
     }
   }
 
+  @Override
+  public long getCurrentTaskAttemptAllocationTime() {
+    readLock.lock();
+    try {
+      return this.currentAttemptAllocationTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   public boolean isInErrorState() {
     return inError;
   }
@@ -532,6 +543,7 @@ public class AMContainerImpl implements AMContainer {
       // Register the additional resources back for this container.
       container.containerLocalResources.putAll(container.additionalLocalResources);
       container.currentAttempt = event.getTaskAttemptId();
+      container.currentAttemptAllocationTime = container.appContext.getClock().getTime();
       if (LOG.isDebugEnabled()) {
         LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
         LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources);

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 8eb074d..4d15fb9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -40,14 +40,15 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   private ContainerId containerId;
   private NodeId nodeId;
   private String nodeHttpAddress;
-  private TezTaskAttemptID schedulingCausalTA;
-  private long scheduledTime;
+  private TezTaskAttemptID creationCausalTA;
+  private long creationTime;
+  private long allocationTime;
 
   public TaskAttemptStartedEvent(TezTaskAttemptID taId,
       String vertexName, long launchTime,
       ContainerId containerId, NodeId nodeId,
       String inProgressLogsUrl, String completedLogsUrl,
-      String nodeHttpAddress, long scheduledTime, TezTaskAttemptID schedulingCausalTA) {
+      String nodeHttpAddress, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.launchTime = launchTime;
@@ -56,8 +57,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     this.inProgressLogsUrl = inProgressLogsUrl;
     this.completedLogsUrl = completedLogsUrl;
     this.nodeHttpAddress = nodeHttpAddress;
-    this.scheduledTime = scheduledTime;
-    this.schedulingCausalTA = schedulingCausalTA;
+    this.creationTime = creationTime;
+    this.creationCausalTA = creationCausalTA;
+    this.allocationTime = allocationTime;
   }
 
   public TaskAttemptStartedEvent() {
@@ -84,9 +86,10 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         .setStartTime(launchTime)
         .setContainerId(containerId.toString())
         .setNodeId(nodeId.toString())
-        .setScheduledTime(scheduledTime);
-    if (schedulingCausalTA != null) {
-      builder.setSchedulingCausalTA(schedulingCausalTA.toString());
+        .setCreationTime(creationTime)
+        .setAllocationTime(allocationTime);
+    if (creationCausalTA != null) {
+      builder.setCreationCausalTA(creationCausalTA.toString());
     }
     return builder.build();
   }
@@ -96,9 +99,10 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     this.launchTime = proto.getStartTime();
     this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
     this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
-    this.scheduledTime = proto.getScheduledTime();
-    if (proto.hasSchedulingCausalTA()) {
-      this.schedulingCausalTA = TezTaskAttemptID.fromString(proto.getSchedulingCausalTA());
+    this.creationTime = proto.getCreationTime();
+    this.allocationTime = proto.getAllocationTime();
+    if (proto.hasCreationCausalTA()) {
+      this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA());
     }
   }
 
@@ -120,7 +124,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   public String toString() {
     return "vertexName=" + vertexName
         + ", taskAttemptId=" + taskAttemptId
-        + ", scheduledTime=" + scheduledTime
+        + ", creationTime=" + creationTime
+        + ", allocationTime=" + allocationTime
         + ", startTime=" + launchTime
         + ", containerId=" + containerId
         + ", nodeId=" + nodeId
@@ -136,12 +141,16 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     return launchTime;
   }
   
-  public long getScheduledTime() {
-    return scheduledTime;
+  public long getCreationTime() {
+    return creationTime;
   }
   
-  public TezTaskAttemptID getSchedulingCausalTA() {
-    return schedulingCausalTA;
+  public long getAllocationTime() {
+    return allocationTime;
+  }
+  
+  public TezTaskAttemptID getCreationCausalTA() {
+    return creationCausalTA;
   }
 
   public ContainerId getContainerId() {

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 528da10..b32b324 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -577,9 +577,10 @@ public class HistoryEventJsonConversion {
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
     otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
-    otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
-    if (event.getSchedulingCausalTA() != null) {
-      otherInfo.put(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, event.getSchedulingCausalTA().toString());
+    otherInfo.put(ATSConstants.CREATION_TIME, event.getCreationTime());
+    otherInfo.put(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+    if (event.getCreationCausalTA() != null) {
+      otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT, event.getCreationCausalTA().toString());
     }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index ffb382e..e268e0d 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -164,8 +164,9 @@ message TaskAttemptStartedProto {
   optional int64 start_time = 2;
   optional string container_id = 3;
   optional string node_id = 4;
-  optional int64 scheduled_time = 5;
-  optional string scheduling_causal_t_a = 6;
+  optional int64 creation_time = 5;
+  optional string creation_causal_t_a = 6;
+  optional int64 allocation_time = 7;
 }
 
 message TaskAttemptFinishedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 920109b..4a797e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -71,7 +70,9 @@ public class TestTaskAttemptRecovery {
 
   private TaskAttemptImpl ta;
   private EventHandler mockEventHandler;
-  private long startTime = System.currentTimeMillis();
+  private long creationTime = System.currentTimeMillis();
+  private long allocationTime = creationTime + 5000;
+  private long startTime = allocationTime + 5000;
   private long finishTime = startTime + 5000;
 
   private TezTaskAttemptID taId;
@@ -153,9 +154,14 @@ public class TestTaskAttemptRecovery {
   }
 
   private void restoreFromTAStartEvent() {
+    TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1);
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", creationTime, causalId, 
+            allocationTime));
+    assertEquals(causalId, ta.getCreationCausalAttempt());
+    assertEquals(creationTime, ta.getCreationTime());
+    assertEquals(allocationTime, ta.getAllocationTime());
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(TaskAttemptState.RUNNING, recoveredState);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 87e7498..1d22e06 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -215,7 +215,7 @@ public class TestTaskRecovery {
     long taStartTime = taskStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(0, task.getFinishedAttemptsCount());
     assertEquals(taskScheduledTime, task.scheduledTime);
@@ -721,7 +721,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -774,7 +774,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.KILLED, null, "", null, 0, null));
     }
@@ -804,7 +804,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.FAILED, null, "", null, 0, null));
     }
@@ -834,7 +834,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts - 1; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.FAILED, null, "", null, 0, null));
     }
@@ -844,7 +844,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
-            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
 
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fafbba6..f9a1c5e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -108,11 +108,14 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getCurrentTaskAttempt());
 
     // Assign task.
+    long currTime = wc.appContext.getClock().getTime();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.LAUNCHING);
     wc.verifyNoOutgoingEvents();
     assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
-
+    assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() > 0);
+    assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() >= currTime);
+    
     // Container Launched
     wc.containerLaunched();
     wc.verifyState(AMContainerState.RUNNING);

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index a32cc27..3507d99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -480,7 +480,7 @@ public class TestHistoryEventsProtoConversion {
             ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
         "host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024,
         TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0)
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024
         );
     TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
         testProtoConversion(event);
@@ -492,10 +492,12 @@ public class TestHistoryEventsProtoConversion {
         deserializedEvent.getNodeId());
     Assert.assertEquals(event.getStartTime(),
         deserializedEvent.getStartTime());
-    Assert.assertEquals(event.getScheduledTime(),
-        deserializedEvent.getScheduledTime());
-    Assert.assertEquals(event.getSchedulingCausalTA(),
-        deserializedEvent.getSchedulingCausalTA());
+    Assert.assertEquals(event.getCreationTime(),
+        deserializedEvent.getCreationTime());
+    Assert.assertEquals(event.getAllocationTime(),
+        deserializedEvent.getAllocationTime());
+    Assert.assertEquals(event.getCreationCausalTA(),
+        deserializedEvent.getCreationCausalTA());
     logEvents(event, deserializedEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index ec1603e..9c11dc7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -160,7 +160,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null, "nodeHttpAddress", 0, null);
+              nodeId, null, null, "nodeHttpAddress", 0, null, 0);
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index 916df95..ba676a2 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -43,14 +43,16 @@ public class TaskAttemptInfo extends BaseInfo {
   private final long endTime;
   private final String diagnostics;
 
-  private final long scheduledTime;
+  private final long creationTime;
+  private final long allocationTime;
   private final String containerId;
   private final String nodeId;
   private final String status;
   private final String logUrl;
-  private final String schedulingCausalTA;
+  private final String creationCausalTA;
   private final long lastDataEventTime;
   private final String lastDataEventSourceTA;
+  private final String terminationCause;
 
   private TaskInfo taskInfo;
 
@@ -70,10 +72,10 @@ public class TaskAttemptInfo extends BaseInfo {
     startTime = otherInfoNode.optLong(Constants.START_TIME);
     endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
     diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
-    scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
-    schedulingCausalTA = StringInterner.weakIntern(
-        otherInfoNode.optString(Constants.SCHEDULING_CAUSAL_ATTEMPT));
-
+    creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
+    creationCausalTA = StringInterner.weakIntern(
+        otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT));
+    allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME);
     containerId = StringInterner.weakIntern(otherInfoNode.optString(Constants.CONTAINER_ID));
     String id = otherInfoNode.optString(Constants.NODE_ID);
     nodeId = StringInterner.weakIntern((id != null) ? (id.split(":")[0]) : "");
@@ -84,6 +86,8 @@ public class TaskAttemptInfo extends BaseInfo {
     lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME);
     lastDataEventSourceTA = StringInterner.weakIntern(
         otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
+    terminationCause = StringInterner
+        .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
   }
 
   void setTaskInfo(TaskInfo taskInfo) {
@@ -110,8 +114,8 @@ public class TaskAttemptInfo extends BaseInfo {
     return endTime;
   }
 
-  public final long getScheduledTime() {
-    return scheduledTime;
+  public final long getCreationTime() {
+    return creationTime;
   }
   
   public final long getLastDataEventTime() {
@@ -126,19 +130,26 @@ public class TaskAttemptInfo extends BaseInfo {
     return getFinishTimeInterval() - getStartTimeInterval();
   }
 
-  public final long getScheduledTimeInterval() {
-    return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+  public final long getCreationTimeInterval() {
+    return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
   }
   
-  public final String getSchedulingCausalTA() {
-    return schedulingCausalTA;
+  public final String getCreationCausalTA() {
+    return creationCausalTA;
   }
 
+  public final long getAllocationTime() {
+    return allocationTime;
+  }
 
   @Override
   public final String getDiagnostics() {
     return diagnostics;
   }
+  
+  public final String getTerminationCause() {
+    return terminationCause;
+  }
 
   public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException {
     return new TaskAttemptInfo(taskInfoObject);
@@ -254,7 +265,7 @@ public class TaskAttemptInfo extends BaseInfo {
     StringBuilder sb = new StringBuilder();
     sb.append("[");
     sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", ");
-    sb.append("scheduledTime=").append(getScheduledTimeInterval()).append(", ");
+    sb.append("creationTime=").append(getCreationTimeInterval()).append(", ");
     sb.append("startTime=").append(getStartTimeInterval()).append(", ");
     sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
     sb.append("timeTaken=").append(getTimeTaken()).append(", ");

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index c89acb2..2b797a5 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -77,6 +77,7 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.tests.MiniTezClusterWithTimeline;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -297,8 +298,9 @@ public class TestHistoryParser {
           }
         }
         for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-          assertTrue(attemptInfo.getStartTimeInterval() > 0);
-          assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
+          assertTrue(attemptInfo.getCreationTime() > 0);
+          assertTrue(attemptInfo.getAllocationTime() > 0);
+          assertTrue(attemptInfo.getStartTime() > 0);
         }
       }
       assertTrue(vertexInfo.getLastTaskToFinish() != null);
@@ -389,13 +391,14 @@ public class TestHistoryParser {
         "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
 
     for (TaskInfo taskInfo : summationVertex.getTasks()) {
-      String lastAttemptId = null;
+      TaskAttemptInfo lastAttempt = null;
       for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-        if (lastAttemptId != null) {
+        if (lastAttempt != null) {
           // failed attempt should be causal TA of next attempt
-          assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
+          assertTrue(lastAttempt.getTaskAttemptId().equals(attemptInfo.getCreationCausalTA()));
+          assertTrue(lastAttempt.getTerminationCause() != null);
         }
-        lastAttemptId = attemptInfo.getTaskAttemptId();
+        lastAttempt = attemptInfo;
       }
     }
 
@@ -769,6 +772,8 @@ public class TestHistoryParser {
         .equals(TaskAttemptState.SUCCEEDED)) {
       assertTrue(attemptInfo.getStartTimeInterval() > 0);
       assertTrue(attemptInfo.getFinishTimeInterval() > 0);
+      assertTrue(attemptInfo.getCreationTime() > 0);
+      assertTrue(attemptInfo.getAllocationTime() > 0);
       assertTrue(attemptInfo.getStartTime() > 0);
       assertTrue(attemptInfo.getFinishTime() > 0);
       assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index eaed115..b979402 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -471,10 +471,11 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
     atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
     atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
-    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
-    if (event.getSchedulingCausalTA() != null) {
-      atsEntity.addOtherInfo(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT,
-          event.getSchedulingCausalTA().toString());
+    atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime());
+    atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+    if (event.getCreationCausalTA() != null) {
+      atsEntity.addOtherInfo(ATSConstants.CREATION_CAUSAL_ATTEMPT,
+          event.getCreationCausalTA().toString());
     }
 
     return atsEntity;

http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 838d9d6..75828c3 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -165,7 +165,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null, "nodeHttpAddress", 0, null);
+              nodeId, null, null, "nodeHttpAddress", 0, null, 0);
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
@@ -727,10 +727,11 @@ public class TestHistoryEventTimelineConversion {
   @Test(timeout = 5000)
   public void testConvertTaskAttemptStartedEvent() {
     long startTime = random.nextLong();
-    long scheduleTime = 1024;
+    long creationTime = 1024;
+    long allocationTime = 1024;
     TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
         startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress", 
-        scheduleTime, tezTaskAttemptID);
+        creationTime, tezTaskAttemptID, allocationTime);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -776,8 +777,9 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(TaskAttemptState.RUNNING.name()
         .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
     Assert.assertEquals(tezTaskAttemptID.toString(), 
-        timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT));
-    Assert.assertEquals(scheduleTime, timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME));
+        timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
+    Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
+    Assert.assertEquals(allocationTime, timelineEntity.getOtherInfo().get(ATSConstants.ALLOCATION_TIME));
   }
 
   @Test(timeout = 5000)