You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/03 22:21:47 UTC

git commit: TEZ-718. Remove some unused classes - JobEndNotifier and Speculation. (Mohammad Kamrul Islam via sseth)

Updated Branches:
  refs/heads/master 00aecc826 -> 29a97cc1f


TEZ-718. Remove some unused classes - JobEndNotifier and Speculation.
(Mohammad Kamrul Islam via sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/29a97cc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/29a97cc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/29a97cc1

Branch: refs/heads/master
Commit: 29a97cc1f22a2fbb06ecda4df48d1342621e3391
Parents: 00aecc8
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 3 13:21:00 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 3 13:21:00 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/JobEndNotifier.java  | 187 -------
 .../tez/dag/app/dag/event/VertexEventType.java  |   3 -
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  28 --
 .../tez/dag/app/speculate/DataStatistics.java   |  78 ---
 .../dag/app/speculate/DefaultSpeculator.java    | 504 -------------------
 ...ponentiallySmoothedTaskRuntimeEstimator.java | 192 -------
 .../speculate/LegacyTaskRuntimeEstimator.java   | 145 ------
 .../app/speculate/NullTaskRuntimesEngine.java   |  72 ---
 .../tez/dag/app/speculate/Speculator.java       |  45 --
 .../tez/dag/app/speculate/SpeculatorEvent.java  |  86 ----
 .../dag/app/speculate/StartEndTimesBase.java    | 195 -------
 .../dag/app/speculate/TaskRuntimeEstimator.java |  90 ----
 .../app/speculate/TaskSpeculationPredicate.java |  38 --
 .../tez/dag/app/speculate/package-info.java     |  20 -
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  26 -
 .../tez/mapreduce/hadoop/MRJobConfig.java       |  34 --
 16 files changed, 1743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
deleted file mode 100644
index 59ed685..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.Proxy;
-import java.net.URL;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.mortbay.log.Log;
-
-/**
- * <p>This class handles job end notification. Submitters of jobs can choose to
- * be notified of the end of a job by supplying a URL to which a connection
- * will be established.
- * <ul><li> The URL connection is fire and forget by default.</li> <li>
- * User can specify number of retry attempts and a time interval at which to
- * attempt retries</li><li>
- * Cluster administrators can set final parameters to set maximum number of
- * tries (0 would disable job end notification) and max time interval and a
- * proxy if needed</li><li>
- * The URL may contain sentinels which will be replaced by jobId and jobStatus 
- * (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
- * </p>
- */
-public class JobEndNotifier implements Configurable {
-  private static final String JOB_ID = "$jobId";
-  private static final String JOB_STATUS = "$jobStatus";
-
-  private Configuration conf;
-  protected String userUrl;
-  protected String proxyConf;
-  protected int numTries; //Number of tries to attempt notification
-  protected int waitInterval; //Time to wait between retrying notification
-  protected URL urlToNotify; //URL to notify read from the config
-  protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
-
-  /**
-   * Parse the URL that needs to be notified of the end of the job, along
-   * with the number of retries in case of failure, the amount of time to
-   * wait between retries and proxy settings
-   * @param conf the configuration 
-   */
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    
-    numTries = Math.min(
-      conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1
-      , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
-    );
-    waitInterval = Math.min(
-    conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
-    , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
-    );
-    waitInterval = (waitInterval < 0) ? 5 : waitInterval;
-
-    userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
-
-    proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
-
-    //Configure the proxy to use if its set. It should be set like
-    //proxyType@proxyHostname:port
-    if(proxyConf != null && !proxyConf.equals("") &&
-         proxyConf.lastIndexOf(":") != -1) {
-      int typeIndex = proxyConf.indexOf("@");
-      Proxy.Type proxyType = Proxy.Type.HTTP;
-      if(typeIndex != -1 &&
-        proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
-        proxyType = Proxy.Type.SOCKS;
-      }
-      String hostname = proxyConf.substring(typeIndex + 1,
-        proxyConf.lastIndexOf(":"));
-      String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
-      try {
-        int port = Integer.parseInt(portConf);
-        proxyToUse = new Proxy(proxyType,
-          new InetSocketAddress(hostname, port));
-        Log.info("Job end notification using proxy type \"" + proxyType + 
-        "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
-      } catch(NumberFormatException nfe) {
-        Log.warn("Job end notification couldn't parse configured proxy's port "
-          + portConf + ". Not going to use a proxy");
-      }
-    }
-
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-  
-  /**
-   * Notify the URL just once. Use best effort. Timeout hard coded to 5
-   * seconds.
-   */
-  protected boolean notifyURLOnce() {
-    boolean success = false;
-    try {
-      Log.info("Job end notification trying " + urlToNotify);
-      HttpURLConnection conn =
-        (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
-      conn.setConnectTimeout(5*1000);
-      conn.setReadTimeout(5*1000);
-      conn.setAllowUserInteraction(false);
-      if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        Log.warn("Job end notification to " + urlToNotify +" failed with code: "
-        + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
-        +"\"");
-      }
-      else {
-        success = true;
-        Log.info("Job end notification to " + urlToNotify + " succeeded");
-      }
-    } catch(IOException ioe) {
-      Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
-    }
-    return success;
-  }
-
-  /**
-   * Notify a server of the completion of a submitted job. The user must have
-   * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
-   * @param jobReport JobReport used to read JobId and JobStatus
-   * @throws InterruptedException
-   */
-  public void notify(JobReport jobReport)
-    throws InterruptedException {
-    // Do we need job-end notification?
-    if (userUrl == null) {
-      Log.info("Job end notification URL not set, skipping.");
-      return;
-    }
-
-    //Do string replacements for jobId and jobStatus
-    if (userUrl.contains(JOB_ID)) {
-      userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString());
-    }
-    if (userUrl.contains(JOB_STATUS)) {
-      userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString());
-    }
-
-    // Create the URL, ensure sanity
-    try {
-      urlToNotify = new URL(userUrl);
-    } catch (MalformedURLException mue) {
-      Log.warn("Job end notification couldn't parse " + userUrl, mue);
-      return;
-    }
-
-    // Send notification
-    boolean success = false;
-    while (numTries-- > 0 && !success) {
-      Log.info("Job end notification attempts left " + numTries);
-      success = notifyURLOnce();
-      if (!success) {
-        Thread.sleep(waitInterval);
-      }
-    }
-    if (!success) {
-      Log.warn("Job end notification failed to notify : " + urlToNotify);
-    } else {
-      Log.info("Job end notification succeeded for " + jobReport.getJobId());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index fccfe91..0cf14eb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -35,9 +35,6 @@ public enum VertexEventType {
   V_SOURCE_TASK_ATTEMPT_COMPLETED,
   V_SOURCE_VERTEX_STARTED,
   
-  //Producer:Speculator
-  V_ADD_SPEC_ATTEMPT,
-
   //Producer:Task
   V_TASK_COMPLETED,
   V_TASK_RESCHEDULED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index d4012f2..53ba8ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -734,20 +734,6 @@ public class TaskAttemptImpl implements TaskAttempt,
 //    }
   }
 
-//  private void maybeSendSpeculatorContainerRequired() {
-//    if (!speculatorContainerRequestSent) {
-//      sendEvent(new SpeculatorEvent(getID().getTaskID(), +1));
-//      speculatorContainerRequestSent = true;
-//    }
-//  }
-//
-//  private void maybeSendSpeculatorContainerNoLongerRequired() {
-//    if (speculatorContainerRequestSent) {
-//      sendEvent(new SpeculatorEvent(getID().getTaskID(), -1));
-//      speculatorContainerRequestSent = false;
-//    }
-//  }
-
   private void sendTaskAttemptCleanupEvent() {
 //    TaskAttemptContext taContext =
 //        new TaskAttemptContextImpl(this.conf,
@@ -821,8 +807,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
-      // Event to speculator - containerNeeded++
-      //ta.maybeSendSpeculatorContainerRequired();
 
       // TODO Creating the remote task here may not be required in case of
       // recovery.
@@ -965,11 +949,6 @@ public class TaskAttemptImpl implements TaskAttempt,
         }
       }
 
-      // Inform the speculator about the container assignment.
-      //ta.maybeSendSpeculatorContainerNoLongerRequired();
-      // Inform speculator about startTime
-      //ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
-
       // Inform the Task
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
           TaskEventType.T_ATTEMPT_LAUNCHED));
@@ -998,8 +977,6 @@ public class TaskAttemptImpl implements TaskAttempt,
         ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
             .getTaskAttemptState()));
       }
-      // Decrement speculator container request.
-      //ta.maybeSendSpeculatorContainerNoLongerRequired();
     }
   }
 
@@ -1057,9 +1034,6 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.reportedStatus.progress = statusEvent.getProgress();
       ta.reportedStatus.counters = statusEvent.getCounters();
 
-      // Inform speculator of status.
-      //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
-
       ta.updateProgressSplits();
 
     }
@@ -1083,8 +1057,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
 
       ta.setFinishTime();
-      // Inform the speculator.
-      //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
       // Send out history event.
       ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
deleted file mode 100644
index 48a131a..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-public class DataStatistics {
-  private int count = 0;
-  private double sum = 0;
-  private double sumSquares = 0;
-
-  public DataStatistics() {
-  }
-
-  public DataStatistics(double initNum) {
-    this.count = 1;
-    this.sum = initNum;
-    this.sumSquares = initNum * initNum;
-  }
-
-  public synchronized void add(double newNum) {
-    this.count++;
-    this.sum += newNum;
-    this.sumSquares += newNum * newNum;
-  }
-
-  public synchronized void updateStatistics(double old, double update) {
-	this.sum += update - old;
-	this.sumSquares += (update * update) - (old * old);
-  }
-
-  public synchronized double mean() {
-    return count == 0 ? 0.0 : sum/count;
-  }
-
-  public synchronized double var() {
-    // E(X^2) - E(X)^2
-    if (count <= 1) {
-      return 0.0;
-    }
-    double mean = mean();
-    return Math.max((sumSquares/count) - mean * mean, 0.0d);
-  }
-
-  public synchronized double std() {
-    return Math.sqrt(this.var());
-  }
-
-  public synchronized double outlier(float sigma) {
-    if (count != 0.0) {
-      return mean() + std() * sigma;
-    }
-
-    return 0.0;
-  }
-
-  public synchronized double count() {
-    return count;
-  }
-
-  public String toString() {
-    return "DataStatistics: count is " + count + ", sum is " + sum +
-    ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
deleted file mode 100644
index 24948c0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.TezBuilderUtils;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-// FIXME does not handle multiple vertices
-public class DefaultSpeculator extends AbstractService implements
-    Speculator {
-
-  private static final long ON_SCHEDULE = Long.MIN_VALUE;
-  private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
-  private static final long TOO_NEW = Long.MIN_VALUE + 2;
-  private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
-  private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
-  private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
-
-  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
-  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
-
-  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
-  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
-  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
-
-  private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
-
-  private final ConcurrentMap<TezTaskID, Boolean> runningTasks
-      = new ConcurrentHashMap<TezTaskID, Boolean>();
-
-  private final Map<Task, AtomicBoolean> pendingSpeculations
-      = new ConcurrentHashMap<Task, AtomicBoolean>();
-
-  // These are the current needs, not the initial needs.  For each job, these
-  //  record the number of attempts that exist and that are actively
-  //  waiting for a container [as opposed to running or finished]
-  // TODO handle multiple dags
-  private final ConcurrentMap<TezVertexID, AtomicInteger> vertexContainerNeeds
-      = new ConcurrentHashMap<TezVertexID, AtomicInteger>();
-
-  private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
-
-  private final Configuration conf;
-  private AppContext context;
-  private Thread speculationBackgroundThread = null;
-  private BlockingQueue<SpeculatorEvent> eventQueue
-      = new LinkedBlockingQueue<SpeculatorEvent>();
-  private TaskRuntimeEstimator estimator;
-
-  private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
-
-  private final Clock clock;
-
-  private final EventHandler<TaskEvent> eventHandler;
-
-  public DefaultSpeculator(Configuration conf, AppContext context) {
-    this(conf, context, context.getClock());
-  }
-
-  public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
-    this(conf, context, getEstimator(conf, context), clock);
-  }
-
-  static private TaskRuntimeEstimator getEstimator
-      (Configuration conf, AppContext context) {
-    TaskRuntimeEstimator estimator;
-
-    try {
-      // "yarn.mapreduce.job.task.runtime.estimator.class"
-      Class<? extends TaskRuntimeEstimator> estimatorClass
-          = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
-                          LegacyTaskRuntimeEstimator.class,
-                          TaskRuntimeEstimator.class);
-
-      Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
-          = estimatorClass.getConstructor();
-
-      estimator = estimatorConstructor.newInstance();
-
-      estimator.contextualize(conf, context);
-    } catch (InstantiationException ex) {
-      LOG.error("Can't make a speculation runtime extimator", ex);
-      throw new TezUncheckedException(ex);
-    } catch (IllegalAccessException ex) {
-      LOG.error("Can't make a speculation runtime extimator", ex);
-      throw new TezUncheckedException(ex);
-    } catch (InvocationTargetException ex) {
-      LOG.error("Can't make a speculation runtime extimator", ex);
-      throw new TezUncheckedException(ex);
-    } catch (NoSuchMethodException ex) {
-      LOG.error("Can't make a speculation runtime extimator", ex);
-      throw new TezUncheckedException(ex);
-    }
-
-  return estimator;
-  }
-
-  // This constructor is designed to be called by other constructors.
-  //  However, it's public because we do use it in the test cases.
-  // Normally we figure out our own estimator.
-  public DefaultSpeculator
-      (Configuration conf, AppContext context,
-       TaskRuntimeEstimator estimator, Clock clock) {
-    super(DefaultSpeculator.class.getName());
-
-    this.conf = conf;
-    this.context = context;
-    this.estimator = estimator;
-    this.clock = clock;
-    this.eventHandler = context.getEventHandler();
-  }
-
-/*   *************************************************************    */
-
-  // This is the task-mongering that creates the two new threads -- one for
-  //  processing events from the event queue and one for periodically
-  //  looking for speculation opportunities
-
-  @Override
-  public void serviceStart() {
-    Runnable speculationBackgroundCore
-        = new Runnable() {
-            @Override
-            public void run() {
-              while (!Thread.currentThread().isInterrupted()) {
-                long backgroundRunStartTime = clock.getTime();
-                try {
-                  int speculations = computeSpeculations();
-                  long mininumRecomp
-                      = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
-                                         : SOONEST_RETRY_AFTER_NO_SPECULATE;
-
-                  long wait = Math.max(mininumRecomp,
-                        clock.getTime() - backgroundRunStartTime);
-
-                  if (speculations > 0) {
-                    LOG.info("We launched " + speculations
-                        + " speculations.  Sleeping " + wait + " milliseconds.");
-                  }
-
-                  Object pollResult
-                      = scanControl.poll(wait, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                  LOG.error("Background thread returning, interrupted : " + e);
-                  e.printStackTrace(System.out);
-                  return;
-                }
-              }
-            }
-          };
-    speculationBackgroundThread = new Thread
-        (speculationBackgroundCore, "DefaultSpeculator background processing");
-    speculationBackgroundThread.start();
-  }
-
-  @Override
-  public void serviceStop() {
-    // this could be called before background thread is established
-    if (speculationBackgroundThread != null) {
-      speculationBackgroundThread.interrupt();
-    }
-  }
-
-  @Override
-  public void handleAttempt(TaskAttemptStatusOld status) {
-    long timestamp = clock.getTime();
-    statusUpdate(status, timestamp);
-  }
-
-  // This section is not part of the Speculator interface; it's used only for
-  //  testing
-  public boolean eventQueueEmpty() {
-    return eventQueue.isEmpty();
-  }
-
-  // This interface is intended to be used only for test cases.
-  public void scanForSpeculations() {
-    LOG.info("We got asked to run a debug speculation scan.");
-    // debug
-    System.out.println("We got asked to run a debug speculation scan.");
-    System.out.println("There are " + scanControl.size()
-        + " events stacked already.");
-    scanControl.add(new Object());
-    Thread.yield();
-  }
-
-
-/*   *************************************************************    */
-
-  // This section contains the code that gets run for a SpeculatorEvent
-
-  private AtomicInteger containerNeed(TezTaskID taskID) {
-    TezVertexID vId = taskID.getVertexID();
-
-    AtomicInteger result = vertexContainerNeeds.get(vId);
-
-    if (result == null) {
-      vertexContainerNeeds.putIfAbsent(vId, new AtomicInteger(0));
-      result = vertexContainerNeeds.get(vId);
-    }
-
-    return result;
-  }
-
-  private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
-    switch (event.getType()) {
-      case ATTEMPT_STATUS_UPDATE:
-        statusUpdate(event.getReportedStatus(), event.getTimestamp());
-        break;
-
-      case TASK_CONTAINER_NEED_UPDATE:
-      {
-        AtomicInteger need = containerNeed(event.getTaskID());
-        need.addAndGet(event.containersNeededChange());
-        break;
-      }
-
-      case ATTEMPT_START:
-      {
-        LOG.info("ATTEMPT_START " + event.getTaskID());
-        estimator.enrollAttempt
-            (event.getReportedStatus(), event.getTimestamp());
-        break;
-      }
-
-      case JOB_CREATE:
-      {
-        LOG.info("JOB_CREATE " + event.getJobID());
-        estimator.contextualize(getConfig(), context);
-        break;
-      }
-    }
-  }
-
-  /**
-   * Absorbs one TaskAttemptStatus
-   *
-   * @param reportedStatus the status report that we got from a task attempt
-   *        that we want to fold into the speculation data for this job
-   * @param timestamp the time this status corresponds to.  This matters
-   *        because statuses contain progress.
-   */
-  protected void statusUpdate(TaskAttemptStatusOld reportedStatus, long timestamp) {
-
-    String stateString = reportedStatus.taskState.toString();
-
-    TezTaskAttemptID attemptID = reportedStatus.id;
-    TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getCurrentDAG();
-
-    if (job == null) {
-      return;
-    }
-
-    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
-    if (task == null) {
-      return;
-    }
-
-    estimator.updateAttempt(reportedStatus, timestamp);
-
-    // If the task is already known to be speculation-bait, don't do anything
-    if (pendingSpeculations.get(task) != null) {
-      if (pendingSpeculations.get(task).get()) {
-        return;
-      }
-    }
-
-    if (stateString.equals(TaskAttemptState.RUNNING.name())) {
-      runningTasks.putIfAbsent(taskID, Boolean.TRUE);
-    } else {
-      runningTasks.remove(taskID, Boolean.TRUE);
-    }
-  }
-
-/*   *************************************************************    */
-
-// This is the code section that runs periodically and adds speculations for
-//  those jobs that need them.
-
-
-  // This can return a few magic values for tasks that shouldn't speculate:
-  //  returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
-  //     considering speculating this task
-  //  returns ALREADY_SPECULATING if that is true.  This has priority.
-  //  returns TOO_NEW if our companion task hasn't gotten any information
-  //  returns PROGRESS_IS_GOOD if the task is sailing through
-  //  returns NOT_RUNNING if the task is not running
-  //
-  // All of these values are negative.  Any value that should be allowed to
-  //  speculate is 0 or positive.
-  private long speculationValue(TezTaskID taskID, long now) {
-    DAG job = context.getCurrentDAG();
-    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-    Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
-    long acceptableRuntime = Long.MIN_VALUE;
-    long result = Long.MIN_VALUE;
-
-    if (!mayHaveSpeculated.contains(taskID)) {
-      acceptableRuntime = estimator.thresholdRuntime(taskID);
-      if (acceptableRuntime == Long.MAX_VALUE) {
-        return ON_SCHEDULE;
-      }
-    }
-
-    TezTaskAttemptID runningTaskAttemptID = null;
-
-    int numberRunningAttempts = 0;
-
-    for (TaskAttempt taskAttempt : attempts.values()) {
-      if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.STARTING) {
-        if (++numberRunningAttempts > 1) {
-          return ALREADY_SPECULATING;
-        }
-        runningTaskAttemptID = taskAttempt.getID();
-
-        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
-
-        long taskAttemptStartTime
-            = estimator.attemptEnrolledTime(runningTaskAttemptID);
-        if (taskAttemptStartTime > now) {
-          // This background process ran before we could process the task
-          //  attempt status change that chronicles the attempt start
-          return TOO_NEW;
-        }
-
-        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
-
-        long estimatedReplacementEndTime
-            = now + estimator.estimatedNewAttemptRuntime(taskID);
-
-        if (estimatedEndTime < now) {
-          return PROGRESS_IS_GOOD;
-        }
-
-        if (estimatedReplacementEndTime >= estimatedEndTime) {
-          return TOO_LATE_TO_SPECULATE;
-        }
-
-        result = estimatedEndTime - estimatedReplacementEndTime;
-      }
-    }
-
-    // If we are here, there's at most one task attempt.
-    if (numberRunningAttempts == 0) {
-      return NOT_RUNNING;
-    }
-
-
-
-    if (acceptableRuntime == Long.MIN_VALUE) {
-      acceptableRuntime = estimator.thresholdRuntime(taskID);
-      if (acceptableRuntime == Long.MAX_VALUE) {
-        return ON_SCHEDULE;
-      }
-    }
-
-    return result;
-  }
-
-  //Add attempt to a given Task.
-  protected void addSpeculativeAttempt(TezTaskID taskID) {
-    LOG.info
-        ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
-    eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
-    mayHaveSpeculated.add(taskID);
-  }
-
-  @Override
-  public void handle(SpeculatorEvent event) {
-    processSpeculatorEvent(event);
-  }
-
-
-  private int maybeScheduleAMapSpeculation() {
-    return maybeScheduleASpeculation(0);
-  }
-
-  private int maybeScheduleAReduceSpeculation() {
-    return maybeScheduleASpeculation(1);
-  }
-
-  private int maybeScheduleASpeculation(int vertexId) {
-    int successes = 0;
-
-    long now = clock.getTime();
-
-    // FIXME this needs to be fixed for a DAG
-    // TODO handle multiple dags
-    for (ConcurrentMap.Entry<TezVertexID, AtomicInteger> vertexEntry :
-        vertexContainerNeeds.entrySet()) {
-      // This race conditon is okay.  If we skip a speculation attempt we
-      //  should have tried because the event that lowers the number of
-      //  containers needed to zero hasn't come through, it will next time.
-      // Also, if we miss the fact that the number of containers needed was
-      //  zero but increased due to a failure it's not too bad to launch one
-      //  container prematurely.
-      if (vertexEntry.getValue().get() > 0) {
-        continue;
-      }
-
-      int numberSpeculationsAlready = 0;
-      int numberRunningTasks = 0;
-
-      // loop through the tasks of the kind
-      DAG job = context.getCurrentDAG();
-
-      Map<TezTaskID, Task> tasks =
-          job.getVertex(TezBuilderUtils.newVertexID(job.getID(), vertexId)).getTasks();
-
-      int numberAllowedSpeculativeTasks
-          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
-                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
-
-      TezTaskID bestTaskID = null;
-      long bestSpeculationValue = -1L;
-
-      // this loop is potentially pricey.
-      // TODO track the tasks that are potentially worth looking at
-      for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
-        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
-
-        if (mySpeculationValue == ALREADY_SPECULATING) {
-          ++numberSpeculationsAlready;
-        }
-
-        if (mySpeculationValue != NOT_RUNNING) {
-          ++numberRunningTasks;
-        }
-
-        if (mySpeculationValue > bestSpeculationValue) {
-          bestTaskID = taskEntry.getKey();
-          bestSpeculationValue = mySpeculationValue;
-        }
-      }
-      numberAllowedSpeculativeTasks
-          = (int) Math.max(numberAllowedSpeculativeTasks,
-                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
-
-      // If we found a speculation target, fire it off
-      if (bestTaskID != null
-          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
-        addSpeculativeAttempt(bestTaskID);
-        ++successes;
-      }
-    }
-
-    return successes;
-  }
-
-  private int computeSpeculations() {
-    // We'll try to issue one map and one reduce speculation per job per run
-    return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
deleted file mode 100644
index 10f217d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-/**
- * This estimator exponentially smooths the rate of progress versus wallclock
- * time.  Conceivably we could write an estimator that smooths time per
- * unit progress, and get different results.
- */
-public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
-
-  private final ConcurrentMap<TezTaskAttemptID, AtomicReference<EstimateVector>> estimates
-      = new ConcurrentHashMap<TezTaskAttemptID, AtomicReference<EstimateVector>>();
-
-  private SmoothedValue smoothedValue;
-
-  private long lambda;
-
-  public enum SmoothedValue {
-    RATE, TIME_PER_UNIT_PROGRESS
-  }
-
-  ExponentiallySmoothedTaskRuntimeEstimator
-      (long lambda, SmoothedValue smoothedValue) {
-    super();
-    this.smoothedValue = smoothedValue;
-    this.lambda = lambda;
-  }
-
-  public ExponentiallySmoothedTaskRuntimeEstimator() {
-    super();
-  }
-
-  // immutable
-  private class EstimateVector {
-    final double value;
-    final float basedOnProgress;
-    final long atTime;
-
-    EstimateVector(double value, float basedOnProgress, long atTime) {
-      this.value = value;
-      this.basedOnProgress = basedOnProgress;
-      this.atTime = atTime;
-    }
-
-    EstimateVector incorporate(float newProgress, long newAtTime) {
-      if (newAtTime <= atTime || newProgress < basedOnProgress) {
-        return this;
-      }
-
-      double oldWeighting
-          = value < 0.0
-              ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
-
-      double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
-
-      if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
-        newRead = 1.0 / newRead;
-      }
-
-      return new EstimateVector
-          (value * oldWeighting + newRead * (1.0 - oldWeighting),
-           newProgress, newAtTime);
-    }
-  }
-
-  private void incorporateReading
-      (TezTaskAttemptID attemptID, float newProgress, long newTime) {
-    //TODO: Refactor this method, it seems more complicated than necessary.
-    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
-
-    if (vectorRef == null) {
-      estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
-      incorporateReading(attemptID, newProgress, newTime);
-      return;
-    }
-
-    EstimateVector oldVector = vectorRef.get();
-
-    if (oldVector == null) {
-      if (vectorRef.compareAndSet(null,
-             new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
-        return;
-      }
-
-      incorporateReading(attemptID, newProgress, newTime);
-      return;
-    }
-
-    while (!vectorRef.compareAndSet
-            (oldVector, oldVector.incorporate(newProgress, newTime))) {
-      oldVector = vectorRef.get();
-    }
-  }
-
-  private EstimateVector getEstimateVector(TezTaskAttemptID attemptID) {
-    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
-
-    if (vectorRef == null) {
-      return null;
-    }
-
-    return vectorRef.get();
-  }
-
-  @Override
-  public void contextualize(Configuration conf, AppContext context) {
-    super.contextualize(conf, context);
-
-    lambda
-        = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
-            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
-    smoothedValue
-        = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
-            ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
-  }
-
-  @Override
-  public long estimatedRuntime(TezTaskAttemptID id) {
-    Long startTime = (Long) startTimes.get(id);
-
-    if (startTime == null) {
-      return -1L;
-    }
-
-    EstimateVector vector = getEstimateVector(id);
-
-    if (vector == null) {
-      return -1L;
-    }
-
-    long sunkTime = vector.atTime - startTime;
-
-    double value = vector.value;
-    float progress = vector.basedOnProgress;
-
-    if (value == 0) {
-      return -1L;
-    }
-
-    double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
-
-    if (rate == 0.0) {
-      return -1L;
-    }
-
-    double remainingTime = (1.0 - progress) / rate;
-
-    return sunkTime + (long)remainingTime;
-  }
-
-  @Override
-  public long runtimeEstimateVariance(TezTaskAttemptID id) {
-    return -1L;
-  }
-
-  @Override
-  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
-    super.updateAttempt(status, timestamp);
-    TezTaskAttemptID attemptID = status.id;
-
-    float progress = status.progress;
-
-    incorporateReading(attemptID, progress, timestamp);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
deleted file mode 100644
index ff7564c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
-
-  private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
-      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
-  private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
-      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
-
-  @Override
-  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
-    super.updateAttempt(status, timestamp);
-
-
-    TezTaskAttemptID attemptID = status.id;
-    TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getCurrentDAG();
-
-    if (job == null) {
-      return;
-    }
-
-    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
-    if (task == null) {
-      return;
-    }
-
-    TaskAttempt taskAttempt = task.getAttempt(attemptID);
-
-    if (taskAttempt == null) {
-      return;
-    }
-
-    Long boxedStart = (Long) startTimes.get(attemptID);
-    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
-
-    // We need to do two things.
-    //  1: If this is a completion, we accumulate statistics in the superclass
-    //  2: If this is not a completion, we learn more about it.
-
-    // This is not a completion, but we're cooking.
-    //
-    if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
-      // See if this task is already in the registry
-      AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
-      AtomicLong estimateVarianceContainer
-          = attemptRuntimeEstimateVariances.get(taskAttempt);
-
-      if (estimateContainer == null) {
-        if (attemptRuntimeEstimates.get(taskAttempt) == null) {
-          attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
-
-          estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
-        }
-      }
-
-      if (estimateVarianceContainer == null) {
-        attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
-        estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
-      }
-
-
-      long estimate = -1;
-      long varianceEstimate = -1;
-
-      // This code assumes that we'll never consider starting a third
-      //  speculative task attempt if two are already running for this task
-      if (start > 0 && timestamp > start) {
-        estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
-        varianceEstimate = (long) (estimate * status.progress / 10);
-      }
-      if (estimateContainer != null) {
-        estimateContainer.set(estimate);
-      }
-      if (estimateVarianceContainer != null) {
-        estimateVarianceContainer.set(varianceEstimate);
-      }
-    }
-  }
-
-  private long storedPerAttemptValue
-       (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
-    TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getCurrentDAG();
-
-    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
-    if (task == null) {
-      return -1L;
-    }
-
-    TaskAttempt taskAttempt = task.getAttempt(attemptID);
-
-    if (taskAttempt == null) {
-      return -1L;
-    }
-
-    AtomicLong estimate = data.get(taskAttempt);
-
-    return estimate == null ? -1L : estimate.get();
-
-  }
-
-  @Override
-  public long estimatedRuntime(TezTaskAttemptID attemptID) {
-    return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
-  }
-
-  @Override
-  public long runtimeEstimateVariance(TezTaskAttemptID attemptID) {
-    return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
deleted file mode 100644
index 9fa3b4b..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-
-/*
- * This class is provided solely as an exemplae of the values that mean
- *  that nothing needs to be computed.  It's not currently used.
- */
-public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
-  @Override
-  public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
-    // no code
-  }
-
-  @Override
-  public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
-    return Long.MAX_VALUE;
-  }
-
-  @Override
-  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
-    // no code
-  }
-
-  @Override
-  public void contextualize(Configuration conf, AppContext context) {
-    // no code
-  }
-
-  @Override
-  public long thresholdRuntime(TezTaskID id) {
-    return Long.MAX_VALUE;
-  }
-
-  @Override
-  public long estimatedRuntime(TezTaskAttemptID id) {
-    return -1L;
-  }
-  @Override
-  public long estimatedNewAttemptRuntime(TezTaskID id) {
-    return -1L;
-  }
-
-  @Override
-  public long runtimeEstimateVariance(TezTaskAttemptID id) {
-    return -1L;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
deleted file mode 100644
index d4d0b5a..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-
-/**
- * Speculator component. Task Attempts' status updates are sent to this
- * component. Concrete implementation runs the speculative algorithm and
- * sends the TaskEventType.T_ADD_ATTEMPT.
- *
- * An implementation also has to arrange for the jobs to be scanned from
- * time to time, to launch the speculations.
- */
-public interface Speculator
-              extends EventHandler<SpeculatorEvent> {
-
-  enum EventType {
-    ATTEMPT_STATUS_UPDATE,
-    ATTEMPT_START,
-    TASK_CONTAINER_NEED_UPDATE,
-    JOB_CREATE
-  }
-
-  // This will be implemented if we go to a model where the events are
-  //  processed within the TaskAttempts' state transitions' code.
-  public void handleAttempt(TaskAttemptStatusOld status);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
deleted file mode 100644
index 917abb6..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
-
-  // valid for ATTEMPT_STATUS_UPDATE
-  private TaskAttemptStatusOld reportedStatus;
-
-  // valid for TASK_CONTAINER_NEED_UPDATE
-  private TezTaskID taskID;
-  private int containersNeededChange;
-  
-  // valid for CREATE_JOB
-  private TezDAGID dagId;
-
-  public SpeculatorEvent(TezDAGID dagId, long timestamp) {
-    super(Speculator.EventType.JOB_CREATE, timestamp);
-    this.dagId = dagId;
-  }
-
-  public SpeculatorEvent(TaskAttemptStatusOld reportedStatus, long timestamp) {
-    super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
-    this.reportedStatus = reportedStatus;
-  }
-
-  public SpeculatorEvent(TezTaskAttemptID attemptID, boolean flag, long timestamp) {
-    super(Speculator.EventType.ATTEMPT_START, timestamp);
-    this.reportedStatus = new TaskAttemptStatusOld();
-    this.reportedStatus.id = attemptID;
-    this.taskID = attemptID.getTaskID();
-  }
-
-  /*
-   * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
-   * We send a +1 event when a task enters a state where it wants a container,
-   *  and a -1 event when it either gets one or withdraws the request.
-   * The per job sum of all these events is the number of containers requested
-   *  but not granted.  The intent is that we only do speculations when the
-   *  speculation wouldn't compete for containers with tasks which need
-   *  to be run.
-   */
-  public SpeculatorEvent(TezTaskID taskID, int containersNeededChange) {
-    super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
-    this.taskID = taskID;
-    this.containersNeededChange = containersNeededChange;
-  }
-
-  public TaskAttemptStatusOld getReportedStatus() {
-    return reportedStatus;
-  }
-
-  public int containersNeededChange() {
-    return containersNeededChange;
-  }
-
-  public TezTaskID getTaskID() {
-    return taskID;
-  }
-  
-  public TezDAGID getJobID() {
-    return dagId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
deleted file mode 100644
index 68d1369..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
-  static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
-      = 0.05F;
-  static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
-      = 1;
-
-  protected Configuration conf = null;
-  protected AppContext context = null;
-
-  protected final Map<TezTaskAttemptID, Long> startTimes
-      = new ConcurrentHashMap<TezTaskAttemptID, Long>();
-
-  // XXXX This class design assumes that the contents of AppContext.getAllJobs
-  //   never changes.  Is that right?
-  //
-  // This assumption comes in in several places, mostly in data structure that
-  //   can grow without limit if a AppContext gets new Job's when the old ones
-  //   run out.  Also, these mapper statistics blocks won't cover the Job's
-  //   we don't know about.
-  // TODO handle multiple DAGs
-  protected final Map<TezVertexID, DataStatistics> vertexStatistics
-      = new HashMap<TezVertexID, DataStatistics>();
-
-  private float slowTaskRelativeTresholds = 0f;
-
-  protected final Set<Task> doneTasks = new HashSet<Task>();
-
-  @Override
-  public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
-    startTimes.put(status.id,timestamp);
-  }
-
-  @Override
-  public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
-    Long result = startTimes.get(attemptID);
-
-    return result == null ? Long.MAX_VALUE : result;
-  }
-
-
-  @Override
-  public void contextualize(Configuration conf, AppContext context) {
-    this.conf = conf;
-    this.context = context;
-
-
-    final DAG dag = context.getCurrentDAG();
-    for (Entry<TezVertexID, Vertex> entry: dag.getVertices().entrySet()) {
-      vertexStatistics.put(entry.getKey(), new DataStatistics());
-      slowTaskRelativeTresholds =
-          conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f);
-    }
-  }
-
-  protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
-    DAG dag = context.getCurrentDAG();
-
-    if (dag == null) {
-      return null;
-    }
-
-    Task task = dag.getVertex(taskID.getVertexID()).getTask(taskID);
-
-    if (task == null) {
-      return null;
-    }
-
-    return vertexStatistics.get(taskID.getVertexID());
-  }
-
-  @Override
-  public long thresholdRuntime(TezTaskID taskID) {
-    DAG job = context.getCurrentDAG();
-
-    DataStatistics statistics = dataStatisticsForTask(taskID);
-
-    Vertex v = job.getVertex(taskID.getVertexID());
-    int completedTasksOfType = v.getCompletedTasks();
-    int totalTasksOfType = v.getTotalTasks();
-
-    if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
-        || (((float)completedTasksOfType) / totalTasksOfType)
-              < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
-      return Long.MAX_VALUE;
-    }
-
-    long result =  statistics == null
-        ? Long.MAX_VALUE
-        : (long)statistics.outlier(slowTaskRelativeTresholds);
-    return result;
-  }
-
-  @Override
-  public long estimatedNewAttemptRuntime(TezTaskID id) {
-    DataStatistics statistics = dataStatisticsForTask(id);
-
-    if (statistics == null) {
-      return -1L;
-    }
-
-    return (long)statistics.mean();
-  }
-
-  @Override
-  public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
-
-    TezTaskAttemptID attemptID = status.id;
-    TezTaskID taskID = attemptID.getTaskID();
-    DAG job = context.getCurrentDAG();
-
-    if (job == null) {
-      return;
-    }
-
-    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
-    if (task == null) {
-      return;
-    }
-
-    Long boxedStart = startTimes.get(attemptID);
-    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
-
-    TaskAttempt taskAttempt = task.getAttempt(attemptID);
-
-    if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
-      boolean isNew = false;
-      // is this  a new success?
-      synchronized (doneTasks) {
-        if (!doneTasks.contains(task)) {
-          doneTasks.add(task);
-          isNew = true;
-        }
-      }
-
-      // It's a new completion
-      // Note that if a task completes twice [because of a previous speculation
-      //  and a race, or a success followed by loss of the machine with the
-      //  local data] we only count the first one.
-      if (isNew) {
-        long finish = timestamp;
-        if (start > 1L && finish > 1L && start <= finish) {
-          long duration = finish - start;
-
-          DataStatistics statistics
-          = dataStatisticsForTask(taskID);
-
-          if (statistics != null) {
-            statistics.add(duration);
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
deleted file mode 100644
index a68dc50..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-
-
-public interface TaskRuntimeEstimator {
-  public void enrollAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
-
-  public long attemptEnrolledTime(TezTaskAttemptID attemptID);
-
-  public void updateAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
-
-  public void contextualize(Configuration conf, AppContext context);
-
-  /**
-   *
-   * Find a maximum reasonable execution wallclock time.  Includes the time
-   * already elapsed.
-   *
-   * Find a maximum reasonable execution time.  Includes the time
-   * already elapsed.  If the projected total execution time for this task
-   * ever exceeds its reasonable execution time, we may speculate it.
-   *
-   * @param id the {@link TezTaskID} of the task we are asking about
-   * @return the task's maximum reasonable runtime, or MAX_VALUE if
-   *         we don't have enough information to rule out any runtime,
-   *         however long.
-   *
-   */
-  public long thresholdRuntime(TezTaskID id);
-
-  /**
-   *
-   * Estimate a task attempt's total runtime.  Includes the time already
-   * elapsed.
-   *
-   * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
-   * @return our best estimate of the attempt's runtime, or {@code -1} if
-   *         we don't have enough information yet to produce an estimate.
-   *
-   */
-  public long estimatedRuntime(TezTaskAttemptID id);
-
-  /**
-   *
-   * Estimates how long a new attempt on this task will take if we start
-   *  one now
-   *
-   * @param id the {@link TezTaskID} of the task we are asking about
-   * @return our best estimate of a new attempt's runtime, or {@code -1} if
-   *         we don't have enough information yet to produce an estimate.
-   *
-   */
-  public long estimatedNewAttemptRuntime(TezTaskID id);
-
-  /**
-   *
-   * Computes the width of the error band of our estimate of the task
-   *  runtime as returned by {@link #estimatedRuntime(TezTaskAttemptID)}
-   *
-   * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
-   * @return our best estimate of the attempt's runtime, or {@code -1} if
-   *         we don't have enough information yet to produce an estimate.
-   *
-   */
-  public long runtimeEstimateVariance(TezTaskAttemptID id);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
deleted file mode 100644
index ae2c612..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.records.TezTaskID;
-
-
-public class TaskSpeculationPredicate {
-  boolean canSpeculate(AppContext context, TezTaskID taskID) {
-    // This class rejects speculating any task that already has speculations,
-    //  or isn't running.
-    //  Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
-    //  can be even more restrictive.
-    // TODO handle multiple dags
-    DAG job = context.getCurrentDAG();
-    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-    return task.getAttempts().size() == 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
deleted file mode 100644
index 8b6d62a..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@InterfaceAudience.Private
-package org.apache.tez.dag.app.speculate;
-import org.apache.hadoop.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 7dd6df0..b156c11 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -417,32 +417,6 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
-
-  @Test
-  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
-    TezTaskID taskId = getNewTaskID();
-    scheduleTaskAttempt(taskId);
-    launchTaskAttempt(mockTask.getLastAttempt().getID());
-    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
-
-    // Add a speculative task attempt that succeeds
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ADD_SPEC_ATTEMPT));
-    launchTaskAttempt(mockTask.getLastAttempt().getID());
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
-
-    // The task should now have succeeded
-    assertTaskSucceededState();
-
-    // Now fail the first task attempt, after the second has succeeded
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0)
-        .getID(), TaskEventType.T_ATTEMPT_FAILED));
-
-    // The task should still be in the succeeded state
-    assertTaskSucceededState();
-
-  }
   
   @SuppressWarnings("rawtypes")
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 7a9cafb..45de43a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -79,11 +79,6 @@ public interface MRJobConfig {
 
   public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
 
-  public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
-
-  public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
-
-  public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
 
   public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
 
@@ -201,8 +196,6 @@ public interface MRJobConfig {
 
   public static final String MAP_DEBUG_SCRIPT = "mapreduce.map.debug.script";
 
-  public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
-
   public static final String MAP_FAILURES_MAX_PERCENT = "mapreduce.map.failures.maxpercent";
 
   public static final String MAP_SKIP_INCR_PROC_COUNT = "mapreduce.map.skip.proc-count.auto-incr";
@@ -266,8 +259,6 @@ public interface MRJobConfig {
 
   public static final String REDUCE_DEBUG_SCRIPT = "mapreduce.reduce.debug.script";
 
-  public static final String REDUCE_SPECULATIVE = "mapreduce.reduce.speculative";
-
   public static final String SHUFFLE_CONNECT_TIMEOUT = "mapreduce.reduce.shuffle.connect.timeout";
 
   public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
@@ -442,10 +433,6 @@ public interface MRJobConfig {
     MR_AM_PREFIX  + "job.reduce.rampup.limit";
   public static final float DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT = 0.5f;
 
-  /** The class that should be used for speculative execution calculations.*/
-  public static final String MR_AM_JOB_SPECULATOR =
-    MR_AM_PREFIX + "job.speculator.class";
-
   /** Class used to estimate task resource needs.*/
   public static final String MR_AM_TASK_ESTIMATOR =
     MR_AM_PREFIX + "job.task.estimator.class";
@@ -583,27 +570,6 @@ public interface MRJobConfig {
   public static final String APPLICATION_ATTEMPT_ID =
       "mapreduce.job.application.attempt.id";
 
-  /**
-   * Job end notification.
-   */
-  public static final String MR_JOB_END_NOTIFICATION_URL =
-    "mapreduce.job.end-notification.url";
-
-  public static final String MR_JOB_END_NOTIFICATION_PROXY =
-    "mapreduce.job.end-notification.proxy";
-
-  public static final String MR_JOB_END_RETRY_ATTEMPTS =
-    "mapreduce.job.end-notification.retry.attempts";
-
-  public static final String MR_JOB_END_RETRY_INTERVAL =
-    "mapreduce.job.end-notification.retry.interval";
-
-  public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
-    "mapreduce.job.end-notification.max.attempts";
-
-  public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
-    "mapreduce.job.end-notification.max.retry.interval";
-
   /*
    * MR AM Service Authorization
    */