You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/17 01:25:22 UTC

git commit: TEZ-460. Fix TaskAttempt status update via heartbeat events (bikas)

Updated Branches:
  refs/heads/TEZ-398 e7b591d79 -> f9713195f


TEZ-460. Fix TaskAttempt status update via heartbeat events (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f9713195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f9713195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f9713195

Branch: refs/heads/TEZ-398
Commit: f9713195f30043b04cb422214e1e6e43d7ac7a5a
Parents: e7b591d
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 16 16:23:12 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 16 16:23:12 2013 -0700

----------------------------------------------------------------------
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  4 +-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java | 38 ++++++++----
 .../dag/event/TaskAttemptEventStatusUpdate.java | 20 ++++--
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 65 +++++++-------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 26 +++++---
 .../dag/app/speculate/DefaultSpeculator.java    |  6 +-
 ...ponentiallySmoothedTaskRuntimeEstimator.java |  4 +-
 .../speculate/LegacyTaskRuntimeEstimator.java   |  4 +-
 .../app/speculate/NullTaskRuntimesEngine.java   |  6 +-
 .../tez/dag/app/speculate/Speculator.java       |  4 +-
 .../tez/dag/app/speculate/SpeculatorEvent.java  | 10 +--
 .../dag/app/speculate/StartEndTimesBase.java    |  6 +-
 .../dag/app/speculate/TaskRuntimeEstimator.java |  6 +-
 13 files changed, 109 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 3c32d36..655119f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerImpl;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -269,7 +269,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     taskHeartbeatHandler.progressing(taskAttemptId);
     pingContainerHeartbeatHandler(taskAttemptId);
-    TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+    TaskAttemptStatusOld taskAttemptStatus = new TaskAttemptStatusOld();
     taskAttemptStatus.id = taskAttemptId;
     // Task sends the updated progress to the TT.
     taskAttemptStatus.progress = taskStatus.getProgress();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index a27c45b..af6c0f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,9 +19,11 @@
 package org.apache.tez.dag.app.dag;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -34,6 +36,30 @@ import org.apache.tez.dag.records.TezVertexID;
  * Read only view of TaskAttempt.
  */
 public interface TaskAttempt {
+  
+  public static class TaskAttemptStatus {
+    public TaskAttemptState state;
+    public DAGCounter localityCounter;
+    public float progress;
+    public TezCounters counters;
+
+    // insert these counters till they come natively from the task itself.
+    // HDFS-5098
+    private AtomicBoolean localitySet = new AtomicBoolean(false);
+    public void setLocalityCounter(DAGCounter localityCounter) {
+      if (!localitySet.get()) {
+        localitySet.set(true);
+        if (counters == null) {
+          counters = new TezCounters();
+        }
+        if (localityCounter != null) {
+          counters.findCounter(localityCounter).increment(1);
+          // TODO Maybe validate that the correct value is being set.
+        }
+      }
+    }
+  }
+  
   TezTaskAttemptID getID();
   TezTaskID getTaskID();
   TezVertexID getVertexID();
@@ -88,18 +114,6 @@ public interface TaskAttempt {
    */
   long getFinishTime();
   
-  /**
-   * @return The attempt's input ready time. If
-   * attempt's input is not ready yet, returns 0.
-   */
-  long getInputReadyTime();
-
-  /**
-   * @return The attempt's output ready time. If attempt's output is not 
-   * ready yet, returns 0.
-   */
-  long getOutputReadyTime();
-
   // TODO TEZDAG - remove all references to ShufflePort
   /**
    * @return the port shuffle is on.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index b968fc6..3a8c489 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -25,18 +25,30 @@ import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 
 public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
+  
+  private TaskStatusUpdateEvent taskAttemptStatus;
+  
+  public TaskAttemptEventStatusUpdate(TezTaskAttemptID id, TaskStatusUpdateEvent statusEvent) {
+    super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
+    this.taskAttemptStatus = statusEvent;
+  }
+  
+  public TaskStatusUpdateEvent getStatusEvent() {
+    return this.taskAttemptStatus;
+  }
 
-  private TaskAttemptStatus reportedTaskAttemptStatus;
+  private TaskAttemptStatusOld reportedTaskAttemptStatus;
 
   public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
-      TaskAttemptStatus taskAttemptStatus) {
+      TaskAttemptStatusOld taskAttemptStatus) {
     super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
     this.reportedTaskAttemptStatus = taskAttemptStatus;
   }
 
-  public TaskAttemptStatus getReportedTaskAttemptStatus() {
+  public TaskAttemptStatusOld getReportedTaskAttemptStatus() {
     return reportedTaskAttemptStatus;
   }
 
@@ -44,7 +56,7 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
    * The internal TaskAttemptStatus object corresponding to remote Task status.
    * 
    */
-  public static class TaskAttemptStatus {
+  public static class TaskAttemptStatusOld {
     
     private AtomicBoolean localitySet = new AtomicBoolean(false);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 30bb1eb..ef51651 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -77,11 +77,9 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -92,6 +90,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 import org.apache.tez.engine.newapi.impl.TaskSpec;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -132,7 +131,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   private String nodeRackName;
 
   private TaskAttemptStatus reportedStatus;
-  private DAGCounter localityCounter;
 
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
@@ -316,10 +314,10 @@ public class TaskAttemptImpl implements TaskAttempt,
       result.setProgress(reportedStatus.progress);
       result.setStartTime(launchTime);
       result.setFinishTime(finishTime);
-      result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
+      //result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
       result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
       //result.setPhase(reportedStatus.phase);
-      result.setStateString(reportedStatus.stateString);
+      //result.setStateString(reportedStatus.stateString);
       result.setCounters(getCounters());
       result.setContainerId(this.getAssignedContainerID());
       result.setNodeManagerHost(trackerName);
@@ -349,7 +347,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public TezCounters getCounters() {
     readLock.lock();
     try {
-      reportedStatus.setLocalityCounter(localityCounter);
+      reportedStatus.setLocalityCounter(reportedStatus.localityCounter);
       TezCounters counters = reportedStatus.counters;
       if (counters == null) {
         counters = EMPTY_COUNTERS;
@@ -471,26 +469,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @Override
-  public long getInputReadyTime() {
-    readLock.lock();
-    try {
-      return this.reportedStatus.shuffleFinishTime;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Override
-  public long getOutputReadyTime() {
-    readLock.lock();
-    try {
-      return this.reportedStatus.sortFinishTime;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Override
   public int getShufflePort() {
     readLock.lock();
     try {
@@ -980,15 +958,16 @@ public class TaskAttemptImpl implements TaskAttempt,
       // JobHistoryEvent
       ta.logJobHistoryAttemptStarted();
 
-      // Compute LOLCAITY counter for this task.
+      // TODO Remove after HDFS-5098
+      // Compute LOCALITY counter for this task.
       if (ta.taskHosts.contains(ta.containerNodeId.getHost())) {
-        ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
+        ta.reportedStatus.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
       } else if (ta.taskRacks.contains(ta.nodeRackName)) {
-        ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
+        ta.reportedStatus.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
       } else {
         // Not computing this if the task does not have locality information.
         if (ta.locationHint != null) {
-          ta.localityCounter = DAGCounter.OTHER_LOCAL_TASKS;
+          ta.reportedStatus.localityCounter = DAGCounter.OTHER_LOCAL_TASKS;
         }
       }
 
@@ -1076,22 +1055,24 @@ public class TaskAttemptImpl implements TaskAttempt,
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      TaskAttemptStatus newReportedStatus = ((TaskAttemptEventStatusUpdate) event)
-          .getReportedTaskAttemptStatus();
-      ta.reportedStatus = newReportedStatus;
-      ta.reportedStatus.taskState = ta.getState();
+      TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
+          .getStatusEvent();
+      ta.reportedStatus.state = ta.getState();
+      ta.reportedStatus.progress = statusEvent.getProgress();
+      ta.reportedStatus.counters = statusEvent.getCounters();
 
       // Inform speculator of status.
       //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
 
       ta.updateProgressSplits();
 
-      // Inform the job about fetch failures if they exist.
-      if (ta.reportedStatus.fetchFailedMaps != null
-          && ta.reportedStatus.fetchFailedMaps.size() > 0) {
-        ta.sendEvent(new VertexEventTaskAttemptFetchFailure(ta.attemptId,
-            ta.reportedStatus.fetchFailedMaps));
-      }
+      // TODO TEZ-431
+//      // Inform the job about fetch failures if they exist.
+//      if (ta.reportedStatus.fetchFailedMaps != null
+//          && ta.reportedStatus.fetchFailedMaps.size() > 0) {
+//        ta.sendEvent(new VertexEventTaskAttemptFetchFailure(ta.attemptId,
+//            ta.reportedStatus.fetchFailedMaps));
+//      }
       // TODO at some point. Nodes may be interested in FetchFailure info.
       // Can be used to blacklist nodes.
     }
@@ -1217,8 +1198,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   private void initTaskAttemptStatus(TaskAttemptStatus result) {
     result.progress = 0.0f;
     // result.phase = Phase.STARTING;
-    result.stateString = "NEW";
-    result.taskState = TaskAttemptState.NEW;
+    //result.stateString = "NEW";
+    result.state = TaskAttemptState.NEW;
     //TezCounters counters = EMPTY_COUNTERS;
     //result.counters = counters;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6e6e109..845201c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -84,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -106,6 +107,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;
 import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 import org.apache.tez.engine.newapi.impl.EventMetaData;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
@@ -1418,6 +1420,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }
+  
+  private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
+    if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
+      throw new TezUncheckedException(
+          "Bad routing of event. Event-vertex: "
+              + sourceMeta.getTaskVertexName() + " Expected: "
+              + vertex.getName());
+    }
+  }
 
   private static class RouteEventTransition  implements
   SingleArcTransition<VertexImpl, VertexEvent> {
@@ -1426,14 +1437,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
       List<TezEvent> tezEvents = rEvent.getEvents();
       for(TezEvent tezEvent : tezEvents) {
+        EventMetaData sourceMeta = tezEvent.getSourceInfo();
+        checkEventSourceMetadata(vertex, sourceMeta);
         switch(tezEvent.getEventType()) {
         case DATA_MOVEMENT_EVENT:
           {
-            EventMetaData sourceMeta = tezEvent.getSourceInfo();
             TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
             DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
             dmEvent.setVersion(srcTaId.getId());
-            assert sourceMeta.getTaskVertexName().equals(vertex.getName());
             Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
                 sourceMeta.getEdgeVertexName()));
             destEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -1441,11 +1452,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         case INPUT_FAILED_EVENT:
         {
-          EventMetaData sourceMeta = tezEvent.getSourceInfo();
           TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
           InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
           ifEvent.setVersion(srcTaId.getId());
-          assert sourceMeta.getTaskVertexName().equals(vertex.getName());
           Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
               sourceMeta.getEdgeVertexName()));
           destEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -1453,15 +1462,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         break;
         case INPUT_READ_ERROR_EVENT:
           {
-            EventMetaData sourceMeta = tezEvent.getSourceInfo();
-            assert sourceMeta.getTaskVertexName().equals(vertex.getName());
             Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
                 sourceMeta.getEdgeVertexName()));
             srcEdge.sendTezEventToSourceTasks(tezEvent);
           }
           break;
         case TASK_STATUS_UPDATE_EVENT:
-          // TODO NEWTEZ FIXME: Handle this event
+          {
+            TaskStatusUpdateEvent sEvent = (TaskStatusUpdateEvent) tezEvent.getEvent();
+            vertex.getEventHandler().handle(
+                new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(),
+                    sEvent));
+          }
           break;
         default:
           throw new TezUncheckedException("Unhandled tez event type: "

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
index 4dea478..24948c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
@@ -43,7 +43,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -209,7 +209,7 @@ public class DefaultSpeculator extends AbstractService implements
   }
 
   @Override
-  public void handleAttempt(TaskAttemptStatus status) {
+  public void handleAttempt(TaskAttemptStatusOld status) {
     long timestamp = clock.getTime();
     statusUpdate(status, timestamp);
   }
@@ -287,7 +287,7 @@ public class DefaultSpeculator extends AbstractService implements
    * @param timestamp the time this status corresponds to.  This matters
    *        because statuses contain progress.
    */
-  protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+  protected void statusUpdate(TaskAttemptStatusOld reportedStatus, long timestamp) {
 
     String stateString = reportedStatus.taskState.toString();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
index 558f78c..10f217d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
@@ -181,7 +181,7 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
   }
 
   @Override
-  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
     super.updateAttempt(status, timestamp);
     TezTaskAttemptID attemptID = status.id;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
index b7ebc68..ff7564c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
@@ -27,7 +27,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 
@@ -39,7 +39,7 @@ public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
       = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
 
   @Override
-  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
     super.updateAttempt(status, timestamp);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
index 9907bc6..9fa3b4b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app.speculate;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 
@@ -31,7 +31,7 @@ import org.apache.tez.dag.records.TezTaskID;
  */
 public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
   @Override
-  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+  public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
     // no code
   }
 
@@ -41,7 +41,7 @@ public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
   }
 
   @Override
-  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
     // no code
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
index 61d9d48..d4d0b5a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
@@ -19,7 +19,7 @@
 package org.apache.tez.dag.app.speculate;
 
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 
 /**
  * Speculator component. Task Attempts' status updates are sent to this
@@ -41,5 +41,5 @@ public interface Speculator
 
   // This will be implemented if we go to a model where the events are
   //  processed within the TaskAttempts' state transitions' code.
-  public void handleAttempt(TaskAttemptStatus status);
+  public void handleAttempt(TaskAttemptStatusOld status);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
index c8f400b..917abb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tez.dag.app.speculate;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -27,7 +27,7 @@ import org.apache.tez.dag.records.TezTaskID;
 public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
 
   // valid for ATTEMPT_STATUS_UPDATE
-  private TaskAttemptStatus reportedStatus;
+  private TaskAttemptStatusOld reportedStatus;
 
   // valid for TASK_CONTAINER_NEED_UPDATE
   private TezTaskID taskID;
@@ -41,14 +41,14 @@ public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
     this.dagId = dagId;
   }
 
-  public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+  public SpeculatorEvent(TaskAttemptStatusOld reportedStatus, long timestamp) {
     super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
     this.reportedStatus = reportedStatus;
   }
 
   public SpeculatorEvent(TezTaskAttemptID attemptID, boolean flag, long timestamp) {
     super(Speculator.EventType.ATTEMPT_START, timestamp);
-    this.reportedStatus = new TaskAttemptStatus();
+    this.reportedStatus = new TaskAttemptStatusOld();
     this.reportedStatus.id = attemptID;
     this.taskID = attemptID.getTaskID();
   }
@@ -68,7 +68,7 @@ public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
     this.containersNeededChange = containersNeededChange;
   }
 
-  public TaskAttemptStatus getReportedStatus() {
+  public TaskAttemptStatusOld getReportedStatus() {
     return reportedStatus;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
index 8d36f28..68d1369 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
@@ -32,7 +32,7 @@ import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -66,7 +66,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
   protected final Set<Task> doneTasks = new HashSet<Task>();
 
   @Override
-  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+  public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
     startTimes.put(status.id,timestamp);
   }
 
@@ -142,7 +142,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
   }
 
   @Override
-  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
 
     TezTaskAttemptID attemptID = status.id;
     TezTaskID taskID = attemptID.getTaskID();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
index 965d965..a68dc50 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
@@ -20,18 +20,18 @@ package org.apache.tez.dag.app.speculate;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 
 
 
 public interface TaskRuntimeEstimator {
-  public void enrollAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+  public void enrollAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
 
   public long attemptEnrolledTime(TezTaskAttemptID attemptID);
 
-  public void updateAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+  public void updateAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
 
   public void contextualize(Configuration conf, AppContext context);