You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/11/19 20:44:34 UTC

[2/2] tez git commit: TEZ-14. Support MR like speculation capabilities based on latency deviation from the mean (bikas)

TEZ-14. Support MR like speculation capabilities based on latency deviation from the mean (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6be75661
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6be75661
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6be75661

Branch: refs/heads/master
Commit: 6be7566142a657a3dbcfc262af9c55546da45728
Parents: c56bb01
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Nov 19 11:44:27 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Nov 19 11:44:27 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/api/TezConfiguration.java    |  16 +-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   6 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   1 +
 .../dag/event/TaskAttemptEventKillRequest.java  |   7 +-
 .../dag/event/TaskAttemptEventStatusUpdate.java |  53 ---
 .../VertexEventTaskAttemptStatusUpdate.java     |  60 +++
 .../tez/dag/app/dag/event/VertexEventType.java  |   3 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   5 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  31 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  23 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  61 ++-
 .../dag/speculation/legacy/DataStatistics.java  |  86 ++++
 .../speculation/legacy/LegacySpeculator.java    | 396 +++++++++++++++++++
 .../legacy/LegacyTaskRuntimeEstimator.java      | 136 +++++++
 .../speculation/legacy/StartEndTimesBase.java   | 138 +++++++
 .../legacy/TaskRuntimeEstimator.java            |  91 +++++
 .../java/org/apache/tez/dag/app/MockClock.java  |  36 ++
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  36 +-
 .../org/apache/tez/dag/app/MockLocalClient.java |   6 +-
 .../org/apache/tez/dag/app/MockTezClient.java   |   5 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |   8 +-
 .../org/apache/tez/dag/app/TestPreemption.java  |   4 +-
 .../org/apache/tez/dag/app/TestSpeculation.java | 161 ++++++++
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 195 ++++++++-
 .../speculation/legacy/TestDataStatistics.java  |  73 ++++
 .../org/apache/tez/test/TestDAGRecovery.java    |   2 +-
 .../apache/tez/test/dag/MultiAttemptDAG.java    |   1 +
 28 files changed, 1557 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3c8c676..2eaf873 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-14. Support MR like speculation capabilities based on latency deviation
+  from the mean
   TEZ-1733. TezMerger should sort FileChunks on size when merging
   TEZ-1738. Tez tfile parser for log parsing
   TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6873863..84ee906 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -272,7 +272,21 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "counters.group-name.max-length";
   public static final int TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH_DEFAULT = 128;
 
-
+  @Unstable
+  /**
+   * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency 
+   * when some tasks are running slower due bad/slow machines
+   */
+  public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled";
+  public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false;
+  
+  /**
+   * Float value. Specifies how many standard deviations away from the mean task execution time 
+   * should be considered as an outlier/slow task.
+   */
+  @Unstable
+  public static final String TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD = 
+                                     TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold";
 
   /**
    * Int value. Upper limit on the number of threads user to launch containers in the app

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index f30fc5c..4aa220d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -40,10 +40,14 @@ import org.apache.tez.dag.records.TezVertexID;
 public interface TaskAttempt {
 
   public static class TaskAttemptStatus {
+    public TezTaskAttemptID id;
     public TaskAttemptState state;
-    public DAGCounter localityCounter;
     public float progress;
     public TezCounters counters;
+    
+    public TaskAttemptStatus(TezTaskAttemptID id) {
+      this.id = id;
+    }
 
     // insert these counters till they come natively from the task itself.
     // HDFS-5098

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index cfedc41..7487fd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -124,6 +124,7 @@ public interface Vertex extends Comparable<Vertex> {
   int getInputVerticesCount();
   int getOutputVerticesCount();
   void scheduleTasks(List<TaskWithLocationHint> tasks);
+  void scheduleSpeculativeTask(TezTaskID taskId);
   Resource getTaskResource();
 
   ProcessorDescriptor getProcessorDescriptor();

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 9bceb1d..0205fcf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -19,7 +19,7 @@ package org.apache.tez.dag.app.dag.event;
 
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements DiagnosableEvent {
 
   private final String message;
 
@@ -28,8 +28,9 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
     this.message = message;
   }
 
-  public String getMessage() {
-    return this.message;
+  @Override
+  public String getDiagnosticInfo() {
+    return message;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index 13577c5..c5a6ea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -18,12 +18,6 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 
@@ -40,51 +34,4 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
   public TaskStatusUpdateEvent getStatusEvent() {
     return this.taskAttemptStatus;
   }
-
-  private TaskAttemptStatusOld reportedTaskAttemptStatus;
-
-  public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
-      TaskAttemptStatusOld taskAttemptStatus) {
-    super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
-    this.reportedTaskAttemptStatus = taskAttemptStatus;
-  }
-
-  public TaskAttemptStatusOld getReportedTaskAttemptStatus() {
-    return reportedTaskAttemptStatus;
-  }
-
-  /**
-   * The internal TaskAttemptStatus object corresponding to remote Task status.
-   * 
-   */
-  public static class TaskAttemptStatusOld {
-    
-    private AtomicBoolean localitySet = new AtomicBoolean(false);
-
-    public TezTaskAttemptID id;
-    public float progress;
-    public TezCounters counters;
-    public String stateString;
-    //public Phase phase;
-    public long outputSize;
-    public List<TezTaskAttemptID> fetchFailedMaps;
-    public long mapFinishTime;
-    public long shuffleFinishTime;
-    public long sortFinishTime;
-    public TaskAttemptState taskState;
-
-    public void setLocalityCounter(DAGCounter localityCounter) {
-      if (!localitySet.get()) {
-        localitySet.set(true);
-        if (counters == null) {
-          counters = new TezCounters();
-        }
-        if (localityCounter != null) {
-          counters.findCounter(localityCounter).increment(1);
-          // TODO Maybe validate that the correct value is being set.
-        }
-      }
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
new file mode 100644
index 0000000..696680d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class VertexEventTaskAttemptStatusUpdate extends VertexEvent {
+  final TezTaskAttemptID id;
+  final TaskAttemptState state;
+  final long timestamp;
+  final boolean justStarted;
+  
+  public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+      long timestamp) {
+    this(taId, state, timestamp, false);
+  }
+  
+  public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+      long timestamp, boolean justStarted) {
+    super(taId.getTaskID().getVertexID(), VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE);
+    this.id = taId;
+    this.state = state;
+    this.timestamp = timestamp;
+    this.justStarted = justStarted;
+  }
+  
+  public long getTimestamp() {
+    return timestamp;
+  }
+  
+  public TezTaskAttemptID getAttemptId() {
+    return id;
+  }
+  
+  public boolean hasJustStarted() {
+    return justStarted;
+  }
+  
+  public TaskAttemptState getTaskAttemptState() {
+    return state;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index b4f7e29..5565f93 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -40,6 +40,9 @@ public enum VertexEventType {
   V_TASK_RESCHEDULED,
   V_TASK_ATTEMPT_COMPLETED,
   
+  //Producer:TaskAttempt
+  V_TASK_ATTEMPT_STATUS_UPDATE,
+  
   //Producer:Any component
   V_INTERNAL_ERROR,
   V_MANAGER_USER_CODE_ERROR,

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index de62752..6e7805e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -111,6 +111,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
 import org.apache.tez.dag.utils.RelocalizationUtils;
@@ -768,6 +769,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         .getAttempt(taId);
   }
 
+  public TaskImpl getTask(TezTaskID tId) {
+    return (TaskImpl) getVertex(tId.getVertexID()).getTask(tId);
+  }
+
   protected void initializeVerticesAndStart() {
     for (Vertex v : vertices.values()) {
       if (v.getInputVerticesCount() == 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index deaba42..3056c1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
@@ -83,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -413,7 +415,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.clock = clock;
     this.taskHeartbeatHandler = taskHeartbeatHandler;
     this.appContext = appContext;
-    this.reportedStatus = new TaskAttemptStatus();
+    this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
     RackResolver.init(conf);
     this.stateMachine = stateMachineFactory.make(this);
@@ -1151,10 +1153,20 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Inform the Task
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
           TaskEventType.T_ATTEMPT_LAUNCHED));
+      
+      if (ta.isSpeculationEnabled()) {
+        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
+            ta.launchTime, true));
+      }
 
       ta.taskHeartbeatHandler.register(ta.attemptId);
     }
   }
+  
+  private boolean isSpeculationEnabled() {
+    return conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+        TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
+  }
 
   protected static class TerminatedBeforeRunningTransition extends
       TerminateTransition {
@@ -1235,6 +1247,10 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       ta.updateProgressSplits();
 
+      if (ta.isSpeculationEnabled()) {
+        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
+            ta.clock.getTime()));
+      }
     }
   }
 
@@ -1259,6 +1275,14 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Unregister from the TaskHeartbeatHandler.
       ta.taskHeartbeatHandler.unregister(ta.attemptId);
+      
+      ta.reportedStatus.state = TaskAttemptState.SUCCEEDED;
+      ta.reportedStatus.progress = 1.0f;
+      
+      if (ta.isSpeculationEnabled()) {
+        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
+            ta.clock.getTime()));
+      }
 
       // TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the
       // TA finishes independently. // Will likely be the Job's responsibility.
@@ -1278,6 +1302,11 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       super.transition(ta, event);
       ta.taskHeartbeatHandler.unregister(ta.attemptId);
+      ta.reportedStatus.state = helper.getTaskAttemptState(); // FAILED or KILLED
+      if (ta.isSpeculationEnabled()) {
+        ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
+            ta.clock.getTime()));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4ded9be..c3ba11d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -1016,6 +1015,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (task.historyTaskStartGenerated) {
         task.logJobHistoryTaskFinishedEvent();
       }
+      TaskAttempt successfulAttempt = task.attempts.get(successTaId);
 
       // issue kill to all other attempts
       for (TaskAttempt attempt : task.attempts.values()) {
@@ -1024,9 +1024,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             //  TA_KILL message to an attempt that doesn't need one for
             //  other reasons.
             !attempt.isFinished()) {
-          LOG.info("Issuing kill to other attempt " + attempt.getID());
+          LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
+            task.successfulAttempt + " has succeeded");
+          String diagnostics = null;
+          if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
+            diagnostics = "Killed this attempt as other speculative attempt : " + successTaId
+                + " succeeded";
+          } else {
+            diagnostics = "Killed this speculative attempt as original attempt: " + successTaId
+                + " succeeded";
+          }
           task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
-              .getID(), "Alternate attempt succeeded"));
+              .getID(), diagnostics));
         }
       }
       // send notification to DAG scheduler
@@ -1336,12 +1345,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      // verify that this occurs only for map task
-      // TODO: consider moving it to MapTaskImpl
-      if (task.leafVertex) {
-        LOG.error("Unexpected event for task of leaf vertex " + event.getType());
-        task.internalError(event.getType());
-      }
 
       TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event;
       TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID();
@@ -1365,6 +1368,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         return TaskStateInternal.SCHEDULED;
       } else {
         // nothing to do
+        LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
+                 task.successfulAttempt + " is already successful");
         return TaskStateInternal.SUCCEEDED;
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d19c4cc..54cd6c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -68,6 +68,7 @@ import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -121,11 +122,13 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
+import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
@@ -205,6 +208,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Resource taskResource;
 
   private Configuration conf;
+  
+  private final boolean isSpeculationEnabled;
 
   //fields initialized in init
 
@@ -235,6 +240,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
+  private static final TaskAttempStatusUpdateEventTransition
+      TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION = new TaskAttempStatusUpdateEventTransition();
   private static final SourceTaskAttemptCompletedEventTransition
       SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new SourceTaskAttemptCompletedEventTransition();
@@ -248,6 +255,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @VisibleForTesting
   final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+  
+  LegacySpeculator speculator;
 
   protected static final
     StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
@@ -460,6 +469,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING),
               VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
+          .addTransition(
+              VertexState.RUNNING,
+              VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
+              TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION)
 
           // Transitions from TERMINATING state.
           .addTransition
@@ -477,6 +490,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
+                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   VertexEventType.V_ROUTE_EVENT,
@@ -494,7 +508,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_TASK_RESCHEDULED,
               new TaskRescheduledAfterVertexSuccessTransition())
 
-          // Ignore-able events
           .addTransition(
               VertexState.SUCCEEDED,
               EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED),
@@ -506,10 +519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexState.FAILED, VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedAfterVertexSuccessTransition())
+          // Ignore-able events
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   // after we are done reruns of source tasks should not affect
                   // us. These reruns may be triggered by other consumer vertices.
                   // We should have been in RUNNING state if we had triggered the
@@ -519,6 +534,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_TASK_ATTEMPT_COMPLETED,
               new TaskAttemptCompletedEventTransition())
 
+
           // Transitions from FAILED state
           .addTransition(
               VertexState.FAILED,
@@ -534,6 +550,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -558,6 +575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
@@ -577,6 +595,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TERMINATE,
                   VertexEventType.V_MANAGER_USER_CODE_ERROR,
+                  VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
@@ -773,7 +792,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // Not sending the notifier a parallelism update since this is the initial parallelism
 
     this.dagVertexGroups = dagVertexGroups;
-
+    
+    isSpeculationEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+        TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
+    
+    if (isSpeculationEnabled()) {
+      speculator = new LegacySpeculator(conf, getAppContext(), this);
+    }
+    
     logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
@@ -782,6 +808,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         stateMachineFactory.make(this), this);
     augmentStateMachine();
   }
+  
+  private boolean isSpeculationEnabled() {
+    return isSpeculationEnabled;
+  }
 
   protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
     return stateMachine;
@@ -1194,6 +1224,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
+  public void scheduleSpeculativeTask(TezTaskID taskId) {
+    Preconditions.checkState(taskId.getId() < numTasks);
+    eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
+  }
+  
+  @Override
   public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
     writeLock.lock();
     try {
@@ -3282,6 +3318,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       eventHandler.handle(new VertexEvent(
         this.vertexId, VertexEventType.V_COMPLETED));
     }
+    
     return VertexState.RUNNING;
   }
 
@@ -3483,7 +3520,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
             LOG.error(msg, e);
             vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
-            vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
+            vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE,
+                TaskTerminationCause.AM_USERCODE_FAILURE);
             return VertexState.TERMINATING;
           }
         } else {
@@ -3515,6 +3553,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class TaskAttempStatusUpdateEventTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventTaskAttemptStatusUpdate updateEvent =
+        ((VertexEventTaskAttemptStatusUpdate) event);
+      if (vertex.isSpeculationEnabled()) {
+        if (updateEvent.hasJustStarted()) {
+          vertex.speculator.notifyAttemptStarted(updateEvent.getAttemptId(),
+              updateEvent.getTimestamp());
+        } else {
+          vertex.speculator.notifyAttemptStatusUpdate(updateEvent.getAttemptId(),
+              updateEvent.getTaskAttemptState(), updateEvent.getTimestamp());
+        }
+      }
+    }
+  }
   private static class TaskCompletedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java
new file mode 100644
index 0000000..7e6f1c2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java
@@ -0,0 +1,86 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class DataStatistics {
+  private int count = 0;
+  private double sum = 0;
+  private double sumSquares = 0;
+
+  public DataStatistics() {
+  }
+
+  public DataStatistics(double initNum) {
+    this.count = 1;
+    this.sum = initNum;
+    this.sumSquares = initNum * initNum;
+  }
+
+  public synchronized void add(double newNum) {
+    this.count++;
+    this.sum += newNum;
+    this.sumSquares += newNum * newNum;
+  }
+
+  @VisibleForTesting
+  synchronized void updateStatistics(double old, double update) {
+    this.sum += update - old;
+    this.sumSquares += (update * update) - (old * old);
+  }
+
+  public synchronized double mean() {
+    // when no data then mean estimate should be large
+    //return count == 0 ? 0.0 : sum/count;
+    return count == 0 ? Long.MAX_VALUE : sum/count;
+  }
+
+  public synchronized double var() {
+    // E(X^2) - E(X)^2
+    if (count <= 1) {
+      return 0.0;
+    }
+    double mean = mean();
+    return Math.max((sumSquares/count) - mean * mean, 0.0d);
+  }
+
+  public synchronized double std() {
+    return Math.sqrt(this.var());
+  }
+
+  public synchronized double outlier(float sigma) {
+    if (count != 0.0) {
+      return mean() + std() * sigma;
+    }
+
+    // when no data available then outlier estimate should be large
+    //return 0.0;
+    return Long.MAX_VALUE;
+  }
+
+  public synchronized double count() {
+    return count;
+  }
+
+  public String toString() {
+    return "DataStatistics: count is " + count + ", sum is " + sum +
+    ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
new file mode 100644
index 0000000..8f76e05
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -0,0 +1,396 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Maintains runtime estimation statistics. Makes periodic updates
+ * estimates based on progress and decides on when to trigger a 
+ * speculative attempt. Speculation attempts are triggered when the 
+ * estimated runtime is more than a threshold beyond the mean runtime
+ * and the original task still has enough estimated runtime left that 
+ * the speculative version is expected to finish sooner than that. If 
+ * the original is close to completion then we dont start a speculation
+ * because it may be likely a wasted attempt. There is a delay between
+ * successive speculations.
+ */
+public class LegacySpeculator {
+  
+  private static final long ON_SCHEDULE = Long.MIN_VALUE;
+  private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+  private static final long TOO_NEW = Long.MIN_VALUE + 2;
+  private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+  private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+  private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+  private static final Log LOG = LogFactory.getLog(LegacySpeculator.class);
+
+  private final ConcurrentMap<TezTaskID, Boolean> runningTasks
+      = new ConcurrentHashMap<TezTaskID, Boolean>();
+
+  // Used to track any TaskAttempts that aren't heart-beating for a while, so
+  // that we can aggressively speculate instead of waiting for task-timeout.
+  private final ConcurrentMap<TezTaskAttemptID, TaskAttemptHistoryStatistics>
+      runningTaskAttemptStatistics = new ConcurrentHashMap<TezTaskAttemptID,
+          TaskAttemptHistoryStatistics>();
+  // Regular heartbeat from tasks is every 3 secs. So if we don't get a
+  // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
+  // in progress.
+  private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
+
+
+  private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+
+  private Vertex vertex;
+  private TaskRuntimeEstimator estimator;
+
+  private final Clock clock;
+  private long nextSpeculateTime = Long.MIN_VALUE;
+
+  public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) {
+    this(conf, context.getClock(), vertex);
+  }
+
+  public LegacySpeculator(Configuration conf, Clock clock, Vertex vertex) {
+    this(conf, getEstimator(conf, vertex), clock, vertex);
+  }
+  
+  static private TaskRuntimeEstimator getEstimator
+      (Configuration conf, Vertex vertex) {
+    TaskRuntimeEstimator estimator = new LegacyTaskRuntimeEstimator();
+    estimator.contextualize(conf, vertex);
+    
+    return estimator;
+  }
+
+  // This constructor is designed to be called by other constructors.
+  //  However, it's public because we do use it in the test cases.
+  // Normally we figure out our own estimator.
+  public LegacySpeculator
+      (Configuration conf, TaskRuntimeEstimator estimator, Clock clock, Vertex vertex) {
+    this.vertex = vertex;
+    this.estimator = estimator;
+    this.clock = clock;
+  }
+
+/*   *************************************************************    */
+
+  void maybeSpeculate() {
+    long now = clock.getTime();
+    
+    if (now < nextSpeculateTime) {
+      return;
+    }
+    
+    int speculations = maybeScheduleASpeculation();
+    long mininumRecomp
+        = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+                           : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+    long wait = Math.max(mininumRecomp,
+          clock.getTime() - now);
+    nextSpeculateTime = now + wait;
+
+    if (speculations > 0) {
+      LOG.info("We launched " + speculations
+          + " speculations.  Waiting " + wait + " milliseconds.");
+    }
+  }
+
+/*   *************************************************************    */
+
+  public void notifyAttemptStarted(TezTaskAttemptID taId, long timestamp) {
+    estimator.enrollAttempt(taId, timestamp);    
+  }
+  
+  public void notifyAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState reportedState,
+      long timestamp) {
+    statusUpdate(taId, reportedState, timestamp);
+    maybeSpeculate();
+  }
+
+  /**
+   * Absorbs one TaskAttemptStatus
+   *
+   * @param reportedStatus the status report that we got from a task attempt
+   *        that we want to fold into the speculation data for this job
+   * @param timestamp the time this status corresponds to.  This matters
+   *        because statuses contain progress.
+   */
+  private void statusUpdate(TezTaskAttemptID attemptID, TaskAttemptState reportedState, long timestamp) {
+
+    TezTaskID taskID = attemptID.getTaskID();
+    Task task = vertex.getTask(taskID);
+
+    Preconditions.checkState(task != null, "Null task for attempt: " + attemptID);
+
+    estimator.updateAttempt(attemptID, reportedState, timestamp);
+
+    //if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+    if (reportedState == TaskAttemptState.RUNNING) {
+      runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+    } else {
+      runningTasks.remove(taskID, Boolean.TRUE);
+      //if (!stateString.equals(TaskAttemptState.STARTING.name())) {
+      if (reportedState == TaskAttemptState.STARTING) {
+        runningTaskAttemptStatistics.remove(attemptID);
+      }
+    }
+  }
+
+/*   *************************************************************    */
+
+// This is the code section that runs periodically and adds speculations for
+//  those jobs that need them.
+
+
+  // This can return a few magic values for tasks that shouldn't speculate:
+  //  returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+  //     considering speculating this task
+  //  returns ALREADY_SPECULATING if that is true.  This has priority.
+  //  returns TOO_NEW if our companion task hasn't gotten any information
+  //  returns PROGRESS_IS_GOOD if the task is sailing through
+  //  returns NOT_RUNNING if the task is not running
+  //
+  // All of these values are negative.  Any value that should be allowed to
+  //  speculate is 0 or positive.
+  private long speculationValue(Task task, long now) {
+    Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+    TezTaskID taskID = task.getTaskId();
+    long acceptableRuntime = Long.MIN_VALUE;
+    long result = Long.MIN_VALUE;
+
+    // short circuit completed tasks. no need to spend time on them
+    if (task.getState() == TaskState.SUCCEEDED) {
+      return NOT_RUNNING;
+    }
+    
+    if (!mayHaveSpeculated.contains(taskID)) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    TezTaskAttemptID runningTaskAttemptID = null;
+
+    int numberRunningAttempts = 0;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      if (taskAttempt.getState() == TaskAttemptState.RUNNING
+          || taskAttempt.getState() == TaskAttemptState.STARTING) {
+        if (++numberRunningAttempts > 1) {
+          return ALREADY_SPECULATING;
+        }
+        runningTaskAttemptID = taskAttempt.getID();
+
+        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+        long taskAttemptStartTime
+            = estimator.attemptEnrolledTime(runningTaskAttemptID);
+        if (taskAttemptStartTime > now) {
+          // This background process ran before we could process the task
+          //  attempt status change that chronicles the attempt start
+          return TOO_NEW;
+        }
+
+        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+        long estimatedReplacementEndTime
+            = now + estimator.newAttemptEstimatedRuntime();
+
+        float progress = taskAttempt.getProgress();
+        TaskAttemptHistoryStatistics data =
+            runningTaskAttemptStatistics.get(runningTaskAttemptID);
+        if (data == null) {
+          runningTaskAttemptStatistics.put(runningTaskAttemptID,
+            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
+        } else {
+          if (estimatedRunTime == data.getEstimatedRunTime()
+              && progress == data.getProgress()) {
+            // Previous stats are same as same stats
+            if (data.notHeartbeatedInAWhile(now)) {
+              // Stats have stagnated for a while, simulate heart-beat.
+              // Now simulate the heart-beat
+              statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime());
+            }
+          } else {
+            // Stats have changed - update our data structure
+            data.setEstimatedRunTime(estimatedRunTime);
+            data.setProgress(progress);
+            data.resetHeartBeatTime(now);
+          }
+        }
+
+        if (estimatedEndTime < now) {
+          return PROGRESS_IS_GOOD;
+        }
+
+        if (estimatedReplacementEndTime >= estimatedEndTime) {
+          return TOO_LATE_TO_SPECULATE;
+        }
+
+        result = estimatedEndTime - estimatedReplacementEndTime;
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+
+
+    if (acceptableRuntime == Long.MIN_VALUE) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    return result;
+  }
+
+  //Add attempt to a given Task.
+  protected void addSpeculativeAttempt(TezTaskID taskID) {
+    LOG.info
+        ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+    vertex.scheduleSpeculativeTask(taskID);
+    mayHaveSpeculated.add(taskID);
+  }
+
+  private int maybeScheduleASpeculation() {
+    int successes = 0;
+
+    long now = clock.getTime();
+
+    int numberSpeculationsAlready = 0;
+    int numberRunningTasks = 0;
+
+    Map<TezTaskID, Task> tasks = vertex.getTasks();
+
+    int numberAllowedSpeculativeTasks
+        = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+                         PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+    TezTaskID bestTaskID = null;
+    long bestSpeculationValue = -1L;
+
+    // this loop is potentially pricey.
+    // TODO track the tasks that are potentially worth looking at
+    for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
+      long mySpeculationValue = speculationValue(taskEntry.getValue(), now);
+
+      if (mySpeculationValue == ALREADY_SPECULATING) {
+        ++numberSpeculationsAlready;
+      }
+
+      if (mySpeculationValue != NOT_RUNNING) {
+        ++numberRunningTasks;
+      }
+
+      if (mySpeculationValue > bestSpeculationValue) {
+        bestTaskID = taskEntry.getKey();
+        bestSpeculationValue = mySpeculationValue;
+      }
+    }
+    numberAllowedSpeculativeTasks
+        = (int) Math.max(numberAllowedSpeculativeTasks,
+                         PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+    // If we found a speculation target, fire it off
+    if (bestTaskID != null
+        && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+      addSpeculativeAttempt(bestTaskID);
+      ++successes;
+    }
+
+    return successes;
+  }
+
+  static class TaskAttemptHistoryStatistics {
+
+    private long estimatedRunTime;
+    private float progress;
+    private long lastHeartBeatTime;
+
+    public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
+        long nonProgressStartTime) {
+      this.estimatedRunTime = estimatedRunTime;
+      this.progress = progress;
+      resetHeartBeatTime(nonProgressStartTime);
+    }
+
+    public long getEstimatedRunTime() {
+      return this.estimatedRunTime;
+    }
+
+    public float getProgress() {
+      return this.progress;
+    }
+
+    public void setEstimatedRunTime(long estimatedRunTime) {
+      this.estimatedRunTime = estimatedRunTime;
+    }
+
+    public void setProgress(float progress) {
+      this.progress = progress;
+    }
+
+    public boolean notHeartbeatedInAWhile(long now) {
+      if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
+        return false;
+      } else {
+        resetHeartBeatTime(now);
+        return true;
+      }
+    }
+
+    public void resetHeartBeatTime(long lastHeartBeatTime) {
+      this.lastHeartBeatTime = lastHeartBeatTime;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java
new file mode 100644
index 0000000..14d269c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java
@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Runtime estimator that uses a simple scheme of estimating task attempt
+ * runtime based on current elapsed runtime and reported progress. 
+ */
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+  private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+  @Override
+  public void updateAttempt(TezTaskAttemptID attemptID, TaskAttemptState state, long timestamp) {
+    super.updateAttempt(attemptID, state, timestamp);
+    
+
+    Task task = vertex.getTask(attemptID.getTaskID());
+
+    if (task == null) {
+      return;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return;
+    }
+    
+    float progress = taskAttempt.getProgress();
+
+    Long boxedStart = startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+    // We need to do two things.
+    //  1: If this is a completion, we accumulate statistics in the superclass
+    //  2: If this is not a completion, we learn more about it.
+
+    // This is not a completion, but we're cooking.
+    //
+    if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
+      // See if this task is already in the registry
+      AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+      AtomicLong estimateVarianceContainer
+          = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+      if (estimateContainer == null) {
+        if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+          attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+
+          estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+        }
+      }
+
+      if (estimateVarianceContainer == null) {
+        attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
+        estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
+      }
+
+
+      long estimate = -1;
+      long varianceEstimate = -1;
+
+      // This code assumes that we'll never consider starting a third
+      //  speculative task attempt if two are already running for this task
+      if (start > 0 && timestamp > start) {
+        estimate = (long) ((timestamp - start) / Math.max(0.0001, progress));
+        varianceEstimate = (long) (estimate * progress / 10);
+      }
+      if (estimateContainer != null) {
+        estimateContainer.set(estimate);
+      }
+      if (estimateVarianceContainer != null) {
+        estimateVarianceContainer.set(varianceEstimate);
+      }
+    }
+  }
+
+  private long storedPerAttemptValue
+       (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
+    Task task = vertex.getTask(attemptID.getTaskID());
+
+    if (task == null) {
+      return -1L;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return -1L;
+    }
+
+    AtomicLong estimate = data.get(taskAttempt);
+
+    return estimate == null ? -1L : estimate.get();
+
+  }
+
+  @Override
+  public long estimatedRuntime(TezTaskAttemptID attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TezTaskAttemptID attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java
new file mode 100644
index 0000000..d4d1a7f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java
@@ -0,0 +1,138 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+/**
+ * Base class that uses the attempt runtime estimations from a derived class
+ * and uses it to determine outliers based on deviating beyond the mean
+ * estimated runtime by some threshold
+ */
+abstract class StartEndTimesBase implements TaskRuntimeEstimator {
+  static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
+      = 0.05F;
+  static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+      = 1;
+
+  protected Vertex vertex;
+
+  protected final Map<TezTaskAttemptID, Long> startTimes
+      = new ConcurrentHashMap<TezTaskAttemptID, Long>();
+
+  protected final DataStatistics taskStatistics = new DataStatistics();
+
+  private float slowTaskRelativeTresholds;
+
+  protected final Set<Task> doneTasks = new HashSet<Task>();
+
+  @Override
+  public void enrollAttempt(TezTaskAttemptID id, long timestamp) {
+    startTimes.put(id, timestamp);
+  }
+
+  @Override
+  public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
+    Long result = startTimes.get(attemptID);
+
+    return result == null ? Long.MAX_VALUE : result;
+  }
+
+  @Override
+  public void contextualize(Configuration conf, Vertex vertex) {
+    slowTaskRelativeTresholds = conf.getFloat(
+        TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f);
+    this.vertex = vertex;
+  }
+
+  protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
+    return taskStatistics;
+  }
+
+  @Override
+  public long thresholdRuntime(TezTaskID taskID) {
+    int completedTasks = vertex.getCompletedTasks();
+
+    int totalTasks = vertex.getTotalTasks();
+    
+    if (completedTasks < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+        || (((float)completedTasks) / totalTasks)
+              < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
+      return Long.MAX_VALUE;
+    }
+    
+    long result = (long)taskStatistics.outlier(slowTaskRelativeTresholds);
+    return result;
+  }
+
+  @Override
+  public long newAttemptEstimatedRuntime() {
+    return (long)taskStatistics.mean();
+  }
+
+  @Override
+  public void updateAttempt(TezTaskAttemptID attemptID, TaskAttemptState state, long timestamp) {
+
+    Task task = vertex.getTask(attemptID.getTaskID());
+
+    if (task == null) {
+      return;
+    }
+
+    Long boxedStart = startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+    
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
+      boolean isNew = false;
+      // is this  a new success?
+      synchronized (doneTasks) {
+        if (!doneTasks.contains(task)) {
+          doneTasks.add(task);
+          isNew = true;
+        }
+      }
+
+      // It's a new completion
+      // Note that if a task completes twice [because of a previous speculation
+      //  and a race, or a success followed by loss of the machine with the
+      //  local data] we only count the first one.
+      if (isNew) {
+        long finish = timestamp;
+        if (start > 1L && finish > 1L && start <= finish) {
+          long duration = finish - start;
+          taskStatistics.add(duration);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java
new file mode 100644
index 0000000..c8edd1e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java
@@ -0,0 +1,91 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+/**
+ * Estimate the runtime for tasks of a given vertex.
+ * 
+ */
+public interface TaskRuntimeEstimator {
+  public void enrollAttempt(TezTaskAttemptID id, long timestamp);
+
+  public long attemptEnrolledTime(TezTaskAttemptID attemptID);
+
+  public void updateAttempt(TezTaskAttemptID taId, TaskAttemptState reportedState, long timestamp);
+
+  public void contextualize(Configuration conf, Vertex vertex);
+
+  /**
+   *
+   * Find a maximum reasonable execution wallclock time.  Includes the time
+   * already elapsed.
+   *
+   * Find a maximum reasonable execution time.  Includes the time
+   * already elapsed.  If the projected total execution time for this task
+   * ever exceeds its reasonable execution time, we may speculate it.
+   *
+   * @param id the {@link TezTaskID} of the task we are asking about
+   * @return the task's maximum reasonable runtime, or MAX_VALUE if
+   *         we don't have enough information to rule out any runtime,
+   *         however long.
+   *
+   */
+  public long thresholdRuntime(TezTaskID id);
+
+  /**
+   *
+   * Estimate a task attempt's total runtime.  Includes the time already
+   * elapsed.
+   *
+   * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
+   * @return our best estimate of the attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long estimatedRuntime(TezTaskAttemptID id);
+
+  /**
+   *
+   * Estimates how long a new attempt on this task will take if we start
+   *  one now
+   *
+   * @return our best estimate of a new attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long newAttemptEstimatedRuntime();
+
+  /**
+   *
+   * Computes the width of the error band of our estimate of the task
+   *  runtime as returned by {@link #estimatedRuntime(TezTaskAttemptID)}
+   *
+   * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
+   * @return our best estimate of the attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long runtimeEstimateVariance(TezTaskAttemptID id);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
new file mode 100644
index 0000000..d015714
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.yarn.util.Clock;
+
+public class MockClock implements Clock {
+  
+  long time = 1000;
+
+  @Override
+  public long getTime() {
+    return time;
+  }
+  
+  public void incrementTime(long inc) {
+    time += inc;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b4109e7..a548e3c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -75,9 +76,12 @@ public class MockDAGAppMaster extends DAGAppMaster {
     
     AtomicBoolean startScheduling = new AtomicBoolean(true);
     AtomicBoolean goFlag;
+    boolean updateProgress = true;
     
     Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
     
+    Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = Maps.newConcurrentMap();
+    
     public MockContainerLauncher(AtomicBoolean goFlag) {
       super("MockContainerLauncher");
       this.goFlag = goFlag;
@@ -88,6 +92,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       TezTaskAttemptID taId;
       String vName;
       ContainerLaunchContext launchContext;
+      int numUpdates = 0;
       boolean completed;
       
       public ContainerData(ContainerId cId, ContainerLaunchContext context) {
@@ -149,6 +154,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
     public void startScheduling(boolean value) {
       startScheduling.set(value);
     }
+    
+    public void updateProgress(boolean value) {
+      this.updateProgress = value;
+    }
 
     public Map<ContainerId, ContainerData> getContainers() {
       return containers;
@@ -164,6 +173,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
       cData.clear();
     }
     
+    public void setStatusUpdatesForTask(TezTaskAttemptID tId, int numUpdates) {
+      tasksWithStatusUpdates.put(tId, numUpdates);
+    }
+    
     void stop(NMCommunicatorStopRequestEvent event) {
       // remove from simulated container list
       containers.remove(event.getContainerId());
@@ -183,6 +196,13 @@ public class MockDAGAppMaster extends DAGAppMaster {
         Thread.sleep(50);
       }
     }
+    
+    void incrementTime(long inc) {
+      Clock clock = getContext().getClock();
+      if (clock instanceof MockClock) {
+        ((MockClock) clock).incrementTime(inc);
+      }
+    }
 
     @Override
     public void run() {
@@ -192,6 +212,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
         if (!startScheduling.get()) { // schedule when asked to do so by the test code
           continue;
         }
+        incrementTime(1000);
         for (Map.Entry<ContainerId, ContainerData> entry : containers.entrySet()) {
           ContainerData cData = entry.getValue();
           ContainerId cId = entry.getKey();
@@ -214,8 +235,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
           } else if (!cData.completed) {
             // container is assigned a task and task is not completed
             // complete the task or preempt the task
-            Integer version = preemptedTasks.get(cData.taId.getTaskID()); 
-            if (version != null && cData.taId.getId() <= version.intValue()) {
+            Integer version = preemptedTasks.get(cData.taId.getTaskID());
+            Integer updatesToMake = tasksWithStatusUpdates.get(cData.taId);
+            if (cData.numUpdates == 0 || // do at least one update
+                updatesToMake != null && cData.numUpdates < updatesToMake) {
+              cData.numUpdates++;
+              float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
+              float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
+              TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
+              getContext().getEventHandler().handle(
+                  new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
+                      new TaskStatusUpdateEvent(null, progress), new EventMetaData(
+                          EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)))));
+            } else if (version != null && cData.taId.getId() <= version.intValue()) {
               preemptContainer(cData);
             } else {
               // send a done notification

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 7e408e1..2631e3c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -29,16 +29,18 @@ import org.apache.tez.client.LocalClient;
 public class MockLocalClient extends LocalClient {
   MockDAGAppMaster mockApp;
   AtomicBoolean mockAppLauncherGoFlag;
+  Clock mockClock;
   
-  public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag) {
+  public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
     this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
+    this.mockClock = clock;
   }
   
   protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, boolean isSession, String userDir) {
     mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
+        (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
     return mockApp;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
index 617415e..0ff3340 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.FrameworkClient;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -32,9 +33,9 @@ public class MockTezClient extends TezClient {
   
   MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
       Map<String, LocalResource> localResources, Credentials credentials,
-      AtomicBoolean mockAppLauncherGoFlag) {
+      Clock clock, AtomicBoolean mockAppLauncherGoFlag) {
     super(name, tezConf, isSession, localResources, credentials);
-    this.client = new MockLocalClient(mockAppLauncherGoFlag);
+    this.client = new MockLocalClient(mockAppLauncherGoFlag, clock);
   }
   
   protected FrameworkClient createFrameworkClient() {

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 8650aea..682e6ed 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -67,7 +67,7 @@ public class TestMockDAGAppMaster {
   public void testLocalResourceSetup() throws Exception {
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
     
-    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
     tezClient.start();
     
     MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
@@ -119,7 +119,7 @@ public class TestMockDAGAppMaster {
 
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
     
-    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
     tezClient.start();
     DAGClient dagClient = tezClient.submitDAG(dag);
     dagClient.waitForCompletion();
@@ -127,7 +127,7 @@ public class TestMockDAGAppMaster {
     tezClient.stop();
     
     // submit the same DAG again to verify it can be done.
-    tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
     tezClient.start();
     dagClient = tezClient.submitDAG(dag);
     dagClient.waitForCompletion();
@@ -139,7 +139,7 @@ public class TestMockDAGAppMaster {
   public void testSchedulerErrorHandling() throws Exception {
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
 
-    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
     tezClient.start();
 
     MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 0958c48..bc15954 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -92,7 +92,7 @@ public class TestPreemption {
     tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
     AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
     MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null,
-        mockAppLauncherGoFlag);
+        null, mockAppLauncherGoFlag);
     tezClient.start();
     
     DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER));
@@ -148,7 +148,7 @@ public class TestPreemption {
     tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
     AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
     MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null,
-        mockAppLauncherGoFlag);
+        null, mockAppLauncherGoFlag);
     tezClient.start();
     syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
     return tezClient;

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
new file mode 100644
index 0000000..114c44b
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -0,0 +1,161 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+
+@SuppressWarnings("deprecation")
+public class TestSpeculation {
+  static Configuration defaultConf;
+  static FileSystem localFs;
+  static Path workDir;
+  
+  MockDAGAppMaster mockApp;
+  MockContainerLauncher mockLauncher;
+  
+  static {
+    try {
+      defaultConf = new Configuration(false);
+      defaultConf.set("fs.defaultFS", "file:///");
+      defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+      defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+      defaultConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, 2);
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+          "TestSpeculation").makeQualified(localFs);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  
+  MockTezClient createTezSession() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
+    MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null,
+        new MockClock(), mockAppLauncherGoFlag);
+    tezClient.start();
+    syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
+    return tezClient;
+  }
+  
+  void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, 
+      MockTezClient tezClient) throws Exception {
+    synchronized (mockAppLauncherGoFlag) {
+      while (!mockAppLauncherGoFlag.get()) {
+        mockAppLauncherGoFlag.wait();
+      }
+      mockApp = tezClient.getLocalClient().getMockApp();
+      mockLauncher = mockApp.getContainerLauncher();
+      mockLauncher.startScheduling(allowScheduling);
+      mockAppLauncherGoFlag.notify();
+    }     
+  }
+  
+  public void testBasicSpeculation(boolean withProgress) throws Exception {
+    DAG dag = DAG.create("test");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+    dag.addVertex(vA);
+
+    MockTezClient tezClient = createTezSession();
+    
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+    // original attempt is killed and speculative one is successful
+    TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+    TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
+
+    mockLauncher.updateProgress(withProgress);
+    mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    Task task = dagImpl.getTask(killedTaId.getTaskID());
+    Assert.assertEquals(2, task.getAttempts().size());
+    Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
+    TaskAttempt killedAttempt = task.getAttempt(killedTaId);
+    Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
+    tezClient.stop();
+  }
+  
+  @Test (timeout=10000)
+  public void testBasicSpeculationWithProgress() throws Exception {
+    testBasicSpeculation(true);
+  }
+
+  @Test (timeout=10000)
+  public void testBasicSpeculationWithoutProgress() throws Exception {
+    testBasicSpeculation(false);
+  }
+
+  @Test (timeout=10000)
+  public void testBasicSpeculationNotUseful() throws Exception {
+    DAG dag = DAG.create("test");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+    dag.addVertex(vA);
+
+    MockTezClient tezClient = createTezSession();
+    
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+    // original attempt is successful and speculative one is killed
+    TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+    TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
+
+    mockLauncher.setStatusUpdatesForTask(successTaId, 100);
+    mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    Task task = dagImpl.getTask(killedTaId.getTaskID());
+    Assert.assertEquals(2, task.getAttempts().size());
+    Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
+    TaskAttempt killedAttempt = task.getAttempt(killedTaId);
+    Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
+    tezClient.stop();
+  }
+
+
+}