You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by je...@apache.org on 2019/11/05 21:48:40 UTC

[hadoop] branch branch-2 updated: MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles)

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new c59b1b6  MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles)
c59b1b6 is described below

commit c59b1b66a5264c3164ac6c9356b51de9b5349c66
Author: Ahmed Hussein <ah...@apache.org>
AuthorDate: Tue Nov 5 15:48:27 2019 -0600

    MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles)
    
    Signed-off-by: Jonathan Eagles <je...@gmail.com>
---
 .../mapreduce/v2/app/speculate/DataStatistics.java |  18 +-
 .../v2/app/speculate/DefaultSpeculator.java        |   3 +-
 .../v2/app/speculate/NullTaskRuntimesEngine.java   |   5 +
 .../SimpleExponentialTaskRuntimeEstimator.java     | 170 ++++
 .../v2/app/speculate/StartEndTimesBase.java        |   8 +-
 .../v2/app/speculate/TaskRuntimeEstimator.java     |  13 +
 .../forecast/SimpleExponentialSmoothing.java       | 196 +++++
 .../mapreduce/v2/app/TestRuntimeEstimators.java    |   8 +
 .../forecast/TestSimpleExponentialForecast.java    | 120 +++
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  31 +
 .../mapreduce/v2/TestSpeculativeExecOnCluster.java | 935 +++++++++++++++++++++
 .../v2/TestSpeculativeExecutionWithMRApp.java      |  46 +-
 12 files changed, 1546 insertions(+), 7 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
index cfaffaf..9f1c122 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
@@ -71,8 +71,22 @@ public class DataStatistics {
     return count;
   }
 
+  /**
+   * calculates the mean value within 95% ConfidenceInterval.
+   * 1.96 is standard for 95 %
+   *
+   * @return the mean value adding 95% confidence interval
+   */
+  public synchronized double meanCI() {
+    if (count <= 1) return 0.0;
+    double currMean = mean();
+    double currStd = std();
+    return currMean + (1.96 * currStd / Math.sqrt(count));
+  }
+
   public String toString() {
-    return "DataStatistics: count is " + count + ", sum is " + sum +
-    ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+    return "DataStatistics: count is " + count + ", sum is " + sum
+        + ", sumSquares is " + sumSquares + " mean is " + mean()
+        + " std() is " + std() + ", meanCI() is " + meanCI();
   }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
index c6bc441..4ab8f71 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
@@ -414,7 +414,8 @@ public class DefaultSpeculator extends AbstractService implements
           if (estimatedRunTime == data.getEstimatedRunTime()
               && progress == data.getProgress()) {
             // Previous stats are same as same stats
-            if (data.notHeartbeatedInAWhile(now)) {
+            if (data.notHeartbeatedInAWhile(now)
+                || estimator.hasStagnatedProgress(runningTaskAttemptID, now)) {
               // Stats have stagnated for a while, simulate heart-beat.
               TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
               taskAttemptStatus.id = runningTaskAttemptID;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
index 7211ff4..eb52bc7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
@@ -69,4 +69,9 @@ public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
     return -1L;
   }
 
+  @Override
+  public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
+    return false;
+  }
+
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java
new file mode 100644
index 0000000..f244b20
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.speculate.forecast.SimpleExponentialSmoothing;
+
+/**
+ * A task Runtime Estimator based on exponential smoothing.
+ */
+public class SimpleExponentialTaskRuntimeEstimator extends StartEndTimesBase {
+  private final static long DEFAULT_ESTIMATE_RUNTIME = -1L;
+
+  /**
+   * Constant time used to calculate the smoothing exponential factor.
+   */
+  private long constTime;
+
+  /**
+   * Number of readings before we consider the estimate stable.
+   * Otherwise, the estimate will be skewed due to the initial estimate
+   */
+  private int skipCount;
+
+  /**
+   * Time window to automatically update the count of the skipCount. This is
+   * needed when a task stalls without any progress, causing the estimator to
+   * return -1 as an estimatedRuntime.
+   */
+  private long stagnatedWindow;
+
+  private final ConcurrentMap<TaskAttemptId,
+      AtomicReference<SimpleExponentialSmoothing>>
+      estimates = new ConcurrentHashMap<>();
+
+  private SimpleExponentialSmoothing getForecastEntry(TaskAttemptId attemptID) {
+    AtomicReference<SimpleExponentialSmoothing> entryRef = estimates
+        .get(attemptID);
+    if (entryRef == null) {
+      return null;
+    }
+    return entryRef.get();
+  }
+
+  private void incorporateReading(TaskAttemptId attemptID,
+      float newRawData, long newTimeStamp) {
+    SimpleExponentialSmoothing foreCastEntry = getForecastEntry(attemptID);
+    if (foreCastEntry == null) {
+      Long tStartTime = startTimes.get(attemptID);
+      // skip if the startTime is not set yet
+      if(tStartTime == null) {
+        return;
+      }
+      estimates.putIfAbsent(attemptID,
+          new AtomicReference<>(SimpleExponentialSmoothing.createForecast(
+              constTime, skipCount, stagnatedWindow,
+              tStartTime)));
+      incorporateReading(attemptID, newRawData, newTimeStamp);
+      return;
+    }
+    foreCastEntry.incorporateReading(newTimeStamp, newRawData);
+  }
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    super.contextualize(conf, context);
+
+    constTime
+        = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
+        MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS);
+
+    stagnatedWindow = Math.max(2 * constTime, conf.getLong(
+        MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS,
+        MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS));
+
+    skipCount = conf
+        .getInt(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS,
+            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS);
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptId id) {
+    SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id);
+    if (foreCastEntry == null) {
+      return DEFAULT_ESTIMATE_RUNTIME;
+    }
+    // TODO: What should we do when estimate is zero
+    double remainingWork = Math.min(1.0, 1.0 - foreCastEntry.getRawData());
+    double forecast = foreCastEntry.getForecast();
+    if (forecast <= 0.0) {
+      return DEFAULT_ESTIMATE_RUNTIME;
+    }
+    long remainingTime = (long)(remainingWork / forecast);
+    long estimatedRuntime = remainingTime
+        + foreCastEntry.getTimeStamp()
+        - foreCastEntry.getStartTime();
+    return estimatedRuntime;
+  }
+
+  @Override
+  public long estimatedNewAttemptRuntime(TaskId id) {
+    DataStatistics statistics = dataStatisticsForTask(id);
+
+    if (statistics == null) {
+      return -1L;
+    }
+
+    double statsMeanCI = statistics.meanCI();
+    double expectedVal =
+        statsMeanCI + Math.min(statsMeanCI * 0.25, statistics.std() / 2);
+    return (long)(expectedVal);
+  }
+
+  @Override
+  public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
+    SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id);
+    if(foreCastEntry == null) {
+      return false;
+    }
+    return foreCastEntry.isDataStagnated(timeStamp);
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptId id) {
+    SimpleExponentialSmoothing forecastEntry = getForecastEntry(id);
+    if (forecastEntry == null) {
+      return DEFAULT_ESTIMATE_RUNTIME;
+    }
+    double forecast = forecastEntry.getForecast();
+    if (forecastEntry.isDefaultForecast(forecast)) {
+      return DEFAULT_ESTIMATE_RUNTIME;
+    }
+    //TODO: What is the best way to measure variance in runtime
+    return 0L;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
+    TaskAttemptId attemptID = status.id;
+
+    float progress = status.progress;
+
+    incorporateReading(attemptID, progress, timestamp);
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java
index aee4821..0de2642 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java
@@ -152,8 +152,7 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator {
     if (statistics == null) {
       return -1L;
     }
-
-    return (long)statistics.mean();
+    return (long) statistics.mean();
   }
 
   @Override
@@ -208,4 +207,9 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator {
       }
     }
   }
+
+  @Override
+  public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
+    return false;
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java
index ce4825f..9cadefe 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java
@@ -87,4 +87,17 @@ public interface TaskRuntimeEstimator {
    *
    */
   public long runtimeEstimateVariance(TaskAttemptId id);
+
+  /**
+   *
+   * Returns true if the estimator has no updates records for a threshold time
+   * window. This helps to identify task attempts that are stalled at the
+   * beginning of execution.
+   *
+   * @param id the {@link TaskAttemptId} of the attempt we are asking about
+   * @param timeStamp the time of the report we compare with
+   * @return true if the task attempt has no progress for a given time window
+   *
+   */
+  public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp);
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java
new file mode 100644
index 0000000..e1ef7be
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Implementation of the static model for Simple exponential smoothing.
+ */
+public class SimpleExponentialSmoothing {
+  public final static double DEFAULT_FORECAST = -1.0;
+  private final int kMinimumReads;
+  private final long kStagnatedWindow;
+  private final long startTime;
+  private long timeConstant;
+
+  private AtomicReference<ForecastRecord> forecastRefEntry;
+
+  public static SimpleExponentialSmoothing createForecast(long timeConstant,
+      int skipCnt, long stagnatedWindow, long timeStamp) {
+    return new SimpleExponentialSmoothing(timeConstant, skipCnt,
+        stagnatedWindow, timeStamp);
+  }
+
+  SimpleExponentialSmoothing(long ktConstant, int skipCnt,
+      long stagnatedWindow, long timeStamp) {
+    kMinimumReads = skipCnt;
+    kStagnatedWindow = stagnatedWindow;
+    this.timeConstant = ktConstant;
+    this.startTime = timeStamp;
+    this.forecastRefEntry = new AtomicReference<ForecastRecord>(null);
+  }
+
+  private class ForecastRecord {
+    private double alpha;
+    private long timeStamp;
+    private double sample;
+    private double rawData;
+    private double forecast;
+    private double sseError;
+    private long myIndex;
+
+    ForecastRecord(double forecast, double rawData, long timeStamp) {
+      this(0.0, forecast, rawData, forecast, timeStamp, 0.0, 0);
+    }
+
+    ForecastRecord(double alpha, double sample, double rawData,
+        double forecast, long timeStamp, double accError, long index) {
+      this.timeStamp = timeStamp;
+      this.alpha = alpha;
+      this.sseError = 0.0;
+      this.sample = sample;
+      this.forecast = forecast;
+      this.rawData = rawData;
+      this.sseError = accError;
+      this.myIndex = index;
+    }
+
+    private double preProcessRawData(double rData, long newTime) {
+      return processRawData(this.rawData, this.timeStamp, rData, newTime);
+    }
+
+    public ForecastRecord append(long newTimeStamp, double rData) {
+      if (this.timeStamp > newTimeStamp) {
+        return this;
+      }
+      double newSample = preProcessRawData(rData, newTimeStamp);
+      long deltaTime = this.timeStamp - newTimeStamp;
+      if (this.myIndex == kMinimumReads) {
+        timeConstant = Math.max(timeConstant, newTimeStamp - startTime);
+      }
+      double smoothFactor =
+          1 - Math.exp(((double) deltaTime) / timeConstant);
+      double forecastVal =
+          smoothFactor * newSample + (1.0 - smoothFactor) * this.forecast;
+      double newSSEError =
+          this.sseError + Math.pow(newSample - this.forecast, 2);
+      return new ForecastRecord(smoothFactor, newSample, rData, forecastVal,
+          newTimeStamp, newSSEError, this.myIndex + 1);
+    }
+
+  }
+
+  public boolean isDataStagnated(long timeStamp) {
+    ForecastRecord rec = forecastRefEntry.get();
+    if (rec != null && rec.myIndex <= kMinimumReads) {
+      return (rec.timeStamp + kStagnatedWindow) < timeStamp;
+    }
+    return false;
+  }
+
+  static double processRawData(double oldRawData, long oldTime,
+      double newRawData, long newTime) {
+    double rate = (newRawData - oldRawData) / (newTime - oldTime);
+    return rate;
+  }
+
+  public void incorporateReading(long timeStamp, double rawData) {
+    ForecastRecord oldRec = forecastRefEntry.get();
+    if (oldRec == null) {
+      double oldForecast =
+          processRawData(0, startTime, rawData, timeStamp);
+      forecastRefEntry.compareAndSet(null,
+          new ForecastRecord(oldForecast, 0.0, startTime));
+      incorporateReading(timeStamp, rawData);
+      return;
+    }
+    while (!forecastRefEntry.compareAndSet(oldRec, oldRec.append(timeStamp,
+        rawData))) {
+      oldRec = forecastRefEntry.get();
+    }
+
+  }
+
+  public double getForecast() {
+    ForecastRecord rec = forecastRefEntry.get();
+    if (rec != null && rec.myIndex > kMinimumReads) {
+      return rec.forecast;
+    }
+    return DEFAULT_FORECAST;
+  }
+
+  public boolean isDefaultForecast(double value) {
+    return value == DEFAULT_FORECAST;
+  }
+
+  public double getSSE() {
+    ForecastRecord rec = forecastRefEntry.get();
+    if (rec != null) {
+      return rec.sseError;
+    }
+    return DEFAULT_FORECAST;
+  }
+
+  public boolean isErrorWithinBound(double bound) {
+    double squaredErr = getSSE();
+    if (squaredErr < 0) {
+      return false;
+    }
+    return bound > squaredErr;
+  }
+
+  public double getRawData() {
+    ForecastRecord rec = forecastRefEntry.get();
+    if (rec != null) {
+      return rec.rawData;
+    }
+    return DEFAULT_FORECAST;
+  }
+
+  public long getTimeStamp() {
+    ForecastRecord rec = forecastRefEntry.get();
+    if (rec != null) {
+      return rec.timeStamp;
+    }
+    return 0L;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public AtomicReference<ForecastRecord> getForecastRefEntry() {
+    return forecastRefEntry;
+  }
+
+  @Override
+  public String toString() {
+    String res = "NULL";
+    ForecastRecord rec = forecastRefEntry.get();
+    if (rec != null) {
+      res =  "rec.index = " + rec.myIndex + ", forecast t: " + rec.timeStamp +
+          ", forecast: " + rec.forecast
+          + ", sample: " + rec.sample + ", raw: " + rec.rawData + ", error: "
+          + rec.sseError + ", alpha: " + rec.alpha;
+    }
+    return res;
+  }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index 3c9e05d..9b26dbc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
@@ -255,6 +256,13 @@ public class TestRuntimeEstimators {
     coreTestEstimator(specificEstimator, 3);
   }
 
+  @Test
+  public void testSimpleExponentialEstimator() throws Exception {
+    TaskRuntimeEstimator specificEstimator
+        = new SimpleExponentialTaskRuntimeEstimator();
+    coreTestEstimator(specificEstimator, 3);
+  }
+
   int taskTypeSlots(TaskType type) {
     return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
   }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java
new file mode 100644
index 0000000..b669df7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testing the statistical model of simple exponential estimator.
+ */
+public class TestSimpleExponentialForecast {
+  private static final Log LOG =
+      LogFactory.getLog(TestSimpleExponentialForecast.class);
+
+  private static long clockTicks = 1000L;
+  private ControlledClock clock;
+
+  private int  incTestSimpleExponentialForecast() {
+    clock = new ControlledClock();
+    clock.tickMsec(clockTicks);
+    SimpleExponentialSmoothing forecaster =
+        new SimpleExponentialSmoothing(10000,
+            12, 10000, clock.getTime());
+
+
+    double progress = 0.0;
+
+    while(progress <= 1.0) {
+      clock.tickMsec(clockTicks);
+      forecaster.incorporateReading(clock.getTime(), progress);
+      LOG.info("progress: " + progress + " --> " + forecaster.toString());
+      progress += 0.005;
+    }
+
+    return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
+  }
+
+
+  private int decTestSimpleExponentialForecast() {
+    clock = new ControlledClock();
+    clock.tickMsec(clockTicks);
+    SimpleExponentialSmoothing forecaster =
+        new SimpleExponentialSmoothing(800,
+            12, 10000, clock.getTime());
+
+    double progress = 0.0;
+
+    double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.001};
+    while(progress <= 1.0) {
+      clock.tickMsec(clockTicks);
+      forecaster.incorporateReading(clock.getTime(), progress);
+      LOG.info("progress: " + progress + " --> " + forecaster.toString());
+      progress += progressRates[(int)(progress / 0.25)];
+    }
+
+    return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
+  }
+
+  private int zeroTestSimpleExponentialForecast() {
+    clock = new ControlledClock();
+    clock.tickMsec(clockTicks);
+    SimpleExponentialSmoothing forecaster =
+        new SimpleExponentialSmoothing(800,
+            12, 10000, clock.getTime());
+
+    double progress = 0.0;
+
+    double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.0, 0.003};
+    int progressInd = 0;
+    while(progress <= 1.0) {
+      clock.tickMsec(clockTicks);
+      forecaster.incorporateReading(clock.getTime(), progress);
+      LOG.info("progress: " + progress + " --> " + forecaster.toString());
+      int currInd = progressInd++ > 1000 ? 4 : (int)(progress / 0.25);
+      progress += progressRates[currInd];
+    }
+
+    return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
+  }
+
+  @Test
+  public void testSimpleExponentialForecastLinearInc() throws Exception {
+    int res = incTestSimpleExponentialForecast();
+    Assert.assertEquals("We got the wrong estimate from simple exponential.",
+        res, 0);
+  }
+
+  @Test
+  public void testSimpleExponentialForecastLinearDec() throws Exception {
+    int res = decTestSimpleExponentialForecast();
+    Assert.assertEquals("We got the wrong estimate from simple exponential.",
+        res, 0);
+  }
+
+  @Test
+  public void testSimpleExponentialForecastZeros() throws Exception {
+    int res = zeroTestSimpleExponentialForecast();
+    Assert.assertEquals("We got the wrong estimate from simple exponential.",
+        res, 0);
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 5a72def..37be797 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -795,6 +795,37 @@ public interface MRJobConfig {
   public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
     MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
 
+  /** The lambda value in the smoothing function of the task estimator.*/
+  String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS =
+      MR_AM_PREFIX
+          + "job.task.estimator.simple.exponential.smooth.lambda-ms";
+  long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS = 1000L * 120;
+
+  /**
+   * The window length in the simple exponential smoothing that considers the
+   * task attempt is stagnated.
+   */
+  String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS =
+      MR_AM_PREFIX
+          + "job.task.estimator.simple.exponential.smooth.stagnated-ms";
+  long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS =
+      1000L * 360;
+
+  /**
+   * The number of initial readings that the estimator ignores before giving a
+   * prediction. At the beginning the smooth estimator won't be accurate in
+   * prediction.
+   */
+  String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS =
+      MR_AM_PREFIX
+          + "job.task.estimator.simple.exponential.smooth.skip-initials";
+
+  /**
+   * The default number of reading the estimators is going to ignore before
+   * returning the smooth exponential prediction.
+   */
+  int DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS = 24;
+
   /** The number of threads used to handle task RPC calls.*/
   public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
     MR_AM_PREFIX + "job.task.listener.thread-count";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java
new file mode 100644
index 0000000..02e4358
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java
@@ -0,0 +1,935 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test speculation on Mini Cluster.
+ */
+@Ignore
+@RunWith(Parameterized.class)
+public class TestSpeculativeExecOnCluster {
+  private static final Log LOG = LogFactory
+      .getLog(TestSpeculativeExecOnCluster.class);
+
+  private static final int NODE_MANAGERS_COUNT = 2;
+  private static final boolean ENABLE_SPECULATIVE_MAP = true;
+  private static final boolean ENABLE_SPECULATIVE_REDUCE = true;
+
+  private static final int NUM_MAP_DEFAULT = 8 * NODE_MANAGERS_COUNT;
+  private static final int NUM_REDUCE_DEFAULT = NUM_MAP_DEFAULT / 2;
+  private static final int MAP_SLEEP_TIME_DEFAULT = 60000;
+  private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000;
+  private static final int MAP_SLEEP_COUNT_DEFAULT = 10000;
+  private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000;
+
+  private static final String MAP_SLEEP_COUNT =
+      "mapreduce.sleepjob.map.sleep.count";
+  private static final String REDUCE_SLEEP_COUNT =
+      "mapreduce.sleepjob.reduce.sleep.count";
+  private static final String MAP_SLEEP_TIME =
+      "mapreduce.sleepjob.map.sleep.time";
+  private static final String REDUCE_SLEEP_TIME =
+      "mapreduce.sleepjob.reduce.sleep.time";
+  private static final String MAP_SLEEP_CALCULATOR_TYPE =
+      "mapreduce.sleepjob.map.sleep.time.calculator";
+  private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run";
+
+  private static Map<String, SleepDurationCalculator> mapSleepTypeMapper;
+
+
+  private static FileSystem localFs;
+
+  static {
+    mapSleepTypeMapper = new HashMap<>();
+    mapSleepTypeMapper.put("normal_run", new SleepDurationCalcImpl());
+    mapSleepTypeMapper.put("stalled_run",
+        new StalledSleepDurationCalcImpl());
+    mapSleepTypeMapper.put("slowing_run",
+        new SlowingSleepDurationCalcImpl());
+    mapSleepTypeMapper.put("dynamic_slowing_run",
+        new DynamicSleepDurationCalcImpl());
+    mapSleepTypeMapper.put("step_stalled_run",
+        new StepStalledSleepDurationCalcImpl());
+    try {
+      localFs = FileSystem.getLocal(new Configuration());
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static final Path TEST_ROOT_DIR =
+      new Path("target",
+          TestSpeculativeExecOnCluster.class.getName() + "-tmpDir")
+          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+  private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+  private static final Path TEST_OUT_DIR =
+      new Path(TEST_ROOT_DIR, "test.out.dir");
+
+  private MiniMRYarnCluster mrCluster;
+
+  private int myNumMapper;
+  private int myNumReduce;
+  private int myMapSleepTime;
+  private int myReduceSleepTime;
+  private int myMapSleepCount;
+  private int myReduceSleepCount;
+  private String chosenSleepCalc;
+  private Class<?> estimatorClass;
+
+
+  /**
+   * The test cases take a long time to run all the estimators against all the
+   * cases. We skip the legacy estimators to reduce the execution time.
+   */
+  private List<String> ignoredTests;
+
+
+  @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
+  public static Collection<Object[]> getTestParameters() {
+    List<String> ignoredTests = Arrays.asList(new String[] {
+        "stalled_run",
+        "slowing_run",
+        "step_stalled_run"
+    });
+    return Arrays.asList(new Object[][] {
+        {SimpleExponentialTaskRuntimeEstimator.class, ignoredTests,
+            NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT},
+        {LegacyTaskRuntimeEstimator.class, ignoredTests,
+            NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT}
+    });
+  }
+
+  public TestSpeculativeExecOnCluster(
+      Class<? extends TaskRuntimeEstimator> estimatorKlass,
+      List<String> testToIgnore,
+      Integer numMapper,
+      Integer numReduce) {
+    this.ignoredTests = testToIgnore;
+    this.estimatorClass = estimatorKlass;
+    this.myNumMapper = numMapper;
+    this.myNumReduce = numReduce;
+
+  }
+
+  @Before
+  public void setup() throws IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(
+          TestSpeculativeExecution.class.getName(), NODE_MANAGERS_COUNT);
+      Configuration conf = new Configuration();
+      mrCluster.init(conf);
+      mrCluster.start();
+
+    }
+
+    // workaround the absent public distcache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
+    myMapSleepTime = MAP_SLEEP_TIME_DEFAULT;
+    myReduceSleepTime = REDUCE_SLEEP_TIME_DEFAULT;
+    myMapSleepCount = MAP_SLEEP_COUNT_DEFAULT;
+    myReduceSleepCount = REDUCE_SLEEP_COUNT_DEFAULT;
+    chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT;
+  }
+
+  @After
+  public void tearDown() {
+    if (mrCluster != null) {
+      mrCluster.stop();
+      mrCluster = null;
+    }
+  }
+
+  /**
+   * Overrides default behavior of Partitioner for testing.
+   */
+  public static class SpeculativeSleepJobPartitioner extends
+      Partitioner<IntWritable, NullWritable> {
+    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+
+  /**
+   * Overrides default behavior of InputSplit for testing.
+   */
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() {
+      return 0L;
+    }
+    public String[] getLocations() {
+      return new String[0];
+    }
+  }
+
+  /**
+   * Input format that sleeps after updating progress.
+   */
+  public static class SpeculativeSleepInputFormat
+      extends InputFormat<IntWritable, IntWritable> {
+
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+          getInt(MRJobConfig.NUM_MAPS, 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+
+    public RecordReader<IntWritable, IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+      final int count = conf.getInt(MAP_SLEEP_COUNT, MAP_SLEEP_COUNT_DEFAULT);
+      if (count < 0) {
+        throw new IOException("Invalid map count: " + count);
+      }
+      final int redcount = conf.getInt(REDUCE_SLEEP_COUNT,
+          REDUCE_SLEEP_COUNT_DEFAULT);
+      if (redcount < 0) {
+        throw new IOException("Invalid reduce count: " + redcount);
+      }
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+
+      return new RecordReader<IntWritable, IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          if (count == 0) {
+            return false;
+          }
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() {
+          return key;
+        }
+        public IntWritable getCurrentValue() {
+          return value;
+        }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return count == 0 ? 100 : records / ((float)count);
+        }
+      };
+    }
+  }
+
+  /**
+   * Interface used to simulate different progress rates of the tasks.
+   */
+  public interface SleepDurationCalculator {
+    long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount,
+        long defaultSleepDuration);
+  }
+
+  /**
+   * All tasks have the same progress.
+   */
+  public static class SleepDurationCalcImpl implements SleepDurationCalculator {
+
+    private double threshold = 1.0;
+    private double slowFactor = 1.0;
+
+    SleepDurationCalcImpl() {
+
+    }
+
+    public long calcSleepDuration(TaskAttemptID taId, int currCount,
+        int totalCount, long defaultSleepDuration) {
+      if (threshold <= ((double) currCount) / totalCount) {
+        return (long) (slowFactor * defaultSleepDuration);
+      }
+      return defaultSleepDuration;
+    }
+  }
+
+  /**
+   * The first attempt of task_0 slows down by a small factor that should not
+   * trigger a speculation. An speculated attempt should never beat the
+   * original task.
+   * A conservative estimator/speculator will speculate another attempt
+   * because of the slower progress.
+   */
+  public static class SlowingSleepDurationCalcImpl implements
+      SleepDurationCalculator {
+
+    private double threshold = 0.4;
+    private double slowFactor = 1.2;
+
+    SlowingSleepDurationCalcImpl() {
+
+    }
+
+    public long calcSleepDuration(TaskAttemptID taId, int currCount,
+        int totalCount, long defaultSleepDuration) {
+      if ((taId.getTaskType() == TaskType.MAP)
+          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+        if (threshold <= ((double) currCount) / totalCount) {
+          return (long) (slowFactor * defaultSleepDuration);
+        }
+      }
+      return defaultSleepDuration;
+    }
+  }
+
+  /**
+   * The progress of the first Mapper task is stalled by 100 times the other
+   * tasks.
+   * The speculated attempt should be succeed if the estimator detects
+   * the slow down on time.
+   */
+  public static class StalledSleepDurationCalcImpl implements
+      SleepDurationCalculator {
+
+    StalledSleepDurationCalcImpl() {
+
+    }
+
+    public long calcSleepDuration(TaskAttemptID taId, int currCount,
+        int totalCount, long defaultSleepDuration) {
+      if ((taId.getTaskType() == TaskType.MAP)
+          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+        return 1000 * defaultSleepDuration;
+      }
+      return defaultSleepDuration;
+    }
+  }
+
+
+  /**
+   * Emulates the behavior with a step change in the progress.
+   */
+  public static class StepStalledSleepDurationCalcImpl implements
+      SleepDurationCalculator {
+
+    private double threshold = 0.4;
+    private double slowFactor = 10000;
+
+    StepStalledSleepDurationCalcImpl() {
+
+    }
+
+    public long calcSleepDuration(TaskAttemptID taId, int currCount,
+        int totalCount, long defaultSleepDuration) {
+      if ((taId.getTaskType() == TaskType.MAP)
+          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+        if (threshold <= ((double) currCount) / totalCount) {
+          return (long) (slowFactor * defaultSleepDuration);
+        }
+      }
+      return defaultSleepDuration;
+    }
+  }
+
+  /**
+   * Dynamically slows down the progress of the first Mapper task.
+   * The speculated attempt should be succeed if the estimator detects
+   * the slow down on time.
+   */
+  public static class DynamicSleepDurationCalcImpl implements
+      SleepDurationCalculator {
+
+    private double[] thresholds;
+    private double[] slowFactors;
+
+    DynamicSleepDurationCalcImpl() {
+      thresholds = new double[] {
+          0.1, 0.25, 0.4, 0.5, 0.6, 0.65, 0.7, 0.8, 0.9
+      };
+      slowFactors = new double[] {
+          2.0, 4.0, 5.0, 6.0, 10.0, 15.0, 20.0, 25.0, 30.0
+      };
+    }
+
+    public long calcSleepDuration(TaskAttemptID taId, int currCount,
+        int totalCount,
+        long defaultSleepDuration) {
+      if ((taId.getTaskType() == TaskType.MAP)
+          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+        double currProgress = ((double) currCount) / totalCount;
+        double slowFactor = 1.0;
+        for (int i = 0; i < thresholds.length; i++) {
+          if (thresholds[i] >= currProgress) {
+            break;
+          }
+          slowFactor = slowFactors[i];
+        }
+        return (long) (slowFactor * defaultSleepDuration);
+      }
+      return defaultSleepDuration;
+    }
+  }
+
+  /**
+   * Dummy class for testing Speculation. Sleeps for a defined period
+   * of time in mapper. Generates fake input for map / reduce
+   * jobs. Note that generated number of input pairs is in the order
+   * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+   * some disk space.
+   * The sleep duration for a given task is going to slowDown to evaluate
+   * the estimator
+   */
+  public static class SpeculativeSleepMapper
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = MAP_SLEEP_TIME_DEFAULT;
+    private int mapSleepCount = 1;
+    private int count = 0;
+    private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl();
+
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+          conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration = mapSleepCount == 0 ? 0 :
+          conf.getLong(MAP_SLEEP_TIME, MAP_SLEEP_TIME_DEFAULT) / mapSleepCount;
+      this.sleepCalc =
+          mapSleepTypeMapper.get(conf.get(MAP_SLEEP_CALCULATOR_TYPE,
+              MAP_SLEEP_CALCULATOR_TYPE_DEFAULT));
+
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context)
+        throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records.
+      try {
+        context.setStatus("Sleeping... (" +
+            (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        long sleepTime = sleepCalc.calcSleepDuration(context.getTaskAttemptID(),
+            count, mapSleepCount,
+            mapSleepDuration);
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException ex) {
+        throw (IOException) new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
+    }
+  }
+
+  /**
+   * Implementation of the reducer task for testing.
+   */
+  public static class SpeculativeSleepReducer
+      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+
+    private long reduceSleepDuration = REDUCE_SLEEP_TIME_DEFAULT;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+          conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+      this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
+          conf.getLong(REDUCE_SLEEP_TIME, REDUCE_SLEEP_TIME_DEFAULT)
+              / reduceSleepCount;
+    }
+
+    public void reduce(IntWritable key, Iterable<NullWritable> values,
+        Context context)
+        throws IOException {
+      try {
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
+      } catch (InterruptedException ex) {
+        throw (IOException) new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  /**
+   * A class used to map the estimatopr implementation to the expected
+   * test results.
+   */
+  class EstimatorMetricsPair {
+
+    private Class<?> estimatorClass;
+    private int expectedMapTasks;
+    private int expectedReduceTasks;
+    private boolean speculativeEstimator;
+
+    EstimatorMetricsPair(Class<?> estimatorClass, int mapTasks, int reduceTasks,
+        boolean isToSpeculate) {
+      this.estimatorClass = estimatorClass;
+      this.expectedMapTasks = mapTasks;
+      this.expectedReduceTasks = reduceTasks;
+      this.speculativeEstimator = isToSpeculate;
+    }
+
+    boolean didSpeculate(Counters counters) {
+      long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+          .getValue();
+      long launchedReduce = counters
+          .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+          .getValue();
+      boolean isSpeculated =
+          (launchedMaps > expectedMapTasks
+              || launchedReduce > expectedReduceTasks);
+      return isSpeculated;
+    }
+
+    String getErrorMessage(Counters counters) {
+      String msg = "Unexpected tasks running estimator "
+          + estimatorClass.getName() + "\n\t";
+      long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+          .getValue();
+      long launchedReduce = counters
+          .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+          .getValue();
+      if (speculativeEstimator) {
+        if (launchedMaps < expectedMapTasks) {
+          msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
+        }
+        if (launchedReduce < expectedReduceTasks) {
+          msg += ", reduces " + launchedReduce + ", expected: "
+              + expectedReduceTasks;
+        }
+      } else {
+        if (launchedMaps > expectedMapTasks) {
+          msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
+        }
+        if (launchedReduce > expectedReduceTasks) {
+          msg += ", reduces " + launchedReduce + ", expected: "
+              + expectedReduceTasks;
+        }
+      }
+      return msg;
+    }
+  }
+
+  @Test
+  public void testExecDynamicSlowingSpeculative() throws Exception {
+    /*------------------------------------------------------------------
+     * Test that Map/Red speculates because:
+     * 1- all tasks have same progress rate except for task_0
+     * 2- task_0 slows down by dynamic increasing factor
+     * 3- A good estimator should readjust the estimation and the speculator
+     *    launches a new task.
+     *
+     * Expected:
+     * A- SimpleExponentialTaskRuntimeEstimator: speculates a successful
+     *    attempt to beat the slowing task_0
+     * B- LegacyTaskRuntimeEstimator: speculates an attempt
+     * C- ExponentiallySmoothedTaskRuntimeEstimator: Fails to detect the slow
+     *    down and never speculates but it may speculate other tasks
+     *    (mappers or reducers)
+     * -----------------------------------------------------------------
+     */
+    chosenSleepCalc = "dynamic_slowing_run";
+
+    if (ignoredTests.contains(chosenSleepCalc)) {
+      return;
+    }
+
+    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(
+            ExponentiallySmoothedTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true)
+    };
+
+    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+        continue;
+      }
+      LOG.info("+++ Dynamic Slow Progress testing against " + estimatorClass
+          .getName() + " +++");
+      Job job = runSpecTest();
+
+      boolean succeeded = job.waitForCompletion(true);
+      Assert.assertTrue(
+          "Job expected to succeed with estimator " + estimatorClass.getName(),
+          succeeded);
+      Assert.assertEquals(
+          "Job expected to succeed with estimator " + estimatorClass.getName(),
+          JobStatus.State.SUCCEEDED, job.getJobState());
+      Counters counters = job.getCounters();
+
+      String errorMessage = specEstimator.getErrorMessage(counters);
+      boolean didSpeculate = specEstimator.didSpeculate(counters);
+      Assert.assertEquals(errorMessage, didSpeculate,
+          specEstimator.speculativeEstimator);
+      Assert
+          .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
+              0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
+    }
+  }
+
+
+  @Test
+  public void testExecSlowNonSpeculative() throws Exception {
+    /*------------------------------------------------------------------
+     * Test that Map/Red does not speculate because:
+     * 1- all tasks have same progress rate except for task_0
+     * 2- task_0 slows down by 0.5 after 50% of the workload
+     * 3- A good estimator may adjust the estimation that the task will finish
+     *    sooner than a new speculated task.
+     *
+     * Expected:
+     * A- SimpleExponentialTaskRuntimeEstimator: does not speculate because
+     *    the new attempt estimated end time is not going to be smaller than the
+     *    original end time.
+     * B- LegacyTaskRuntimeEstimator: speculates an attempt
+     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates an attempt.
+     * -----------------------------------------------------------------
+     */
+    chosenSleepCalc = "slowing_run";
+
+    if (ignoredTests.contains(chosenSleepCalc)) {
+      return;
+    }
+
+    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, false),
+        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(
+            ExponentiallySmoothedTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true)
+    };
+
+    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+        continue;
+      }
+      LOG.info("+++ Linear Slow Progress Non Speculative testing against "
+          + estimatorClass.getName() + " +++");
+      Job job = runSpecTest();
+
+      boolean succeeded = job.waitForCompletion(true);
+      Assert.assertTrue(
+          "Job expected to succeed with estimator " + estimatorClass.getName(),
+          succeeded);
+      Assert.assertEquals(
+          "Job expected to succeed with estimator " + estimatorClass.getName(),
+          JobStatus.State.SUCCEEDED, job.getJobState());
+      Counters counters = job.getCounters();
+
+      String errorMessage = specEstimator.getErrorMessage(counters);
+      boolean didSpeculate = specEstimator.didSpeculate(counters);
+      Assert.assertEquals(errorMessage, didSpeculate,
+          specEstimator.speculativeEstimator);
+      Assert
+          .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
+              0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
+    }
+  }
+
+  @Test
+  public void testExecStepStalledSpeculative() throws Exception {
+    /*------------------------------------------------------------------
+     * Test that Map/Red speculates because:
+     * 1- all tasks have same progress rate except for task_0
+     * 2- task_0 has long sleep duration
+     * 3- A good estimator may adjust the estimation that the task will finish
+     *    sooner than a new speculated task.
+     *
+     * Expected:
+     * A- SimpleExponentialTaskRuntimeEstimator: speculates
+     * B- LegacyTaskRuntimeEstimator: speculates
+     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
+     * -----------------------------------------------------------------
+     */
+    chosenSleepCalc = "step_stalled_run";
+    if (ignoredTests.contains(chosenSleepCalc)) {
+      return;
+    }
+    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(
+            ExponentiallySmoothedTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true)
+    };
+
+    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+        continue;
+      }
+      LOG.info("+++ Stalled Progress testing against "
+          + estimatorClass.getName() + " +++");
+      Job job = runSpecTest();
+
+      boolean succeeded = job.waitForCompletion(true);
+      Assert.assertTrue("Job expected to succeed with estimator "
+          + estimatorClass.getName(), succeeded);
+      Assert.assertEquals("Job expected to succeed with estimator "
+              + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
+          job.getJobState());
+      Counters counters = job.getCounters();
+
+      String errorMessage = specEstimator.getErrorMessage(counters);
+      boolean didSpeculate = specEstimator.didSpeculate(counters);
+      Assert.assertEquals(errorMessage, didSpeculate,
+          specEstimator.speculativeEstimator);
+      Assert.assertEquals("Failed maps higher than 0 "
+              + estimatorClass.getName(), 0,
+          counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+              .getValue());
+    }
+  }
+
+  @Test
+  public void testExecStalledSpeculative() throws Exception {
+    /*------------------------------------------------------------------
+     * Test that Map/Red speculates because:
+     * 1- all tasks have same progress rate except for task_0
+     * 2- task_0 has long sleep duration
+     * 3- A good estimator may adjust the estimation that the task will finish
+     *    sooner than a new speculated task.
+     *
+     * Expected:
+     * A- SimpleExponentialTaskRuntimeEstimator: speculates
+     * B- LegacyTaskRuntimeEstimator: speculates
+     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
+     * -----------------------------------------------------------------
+     */
+    chosenSleepCalc = "stalled_run";
+
+    if (ignoredTests.contains(chosenSleepCalc)) {
+      return;
+    }
+    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(
+            ExponentiallySmoothedTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true)
+    };
+
+    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+        continue;
+      }
+      LOG.info("+++ Stalled Progress testing against "
+          + estimatorClass.getName() + " +++");
+      Job job = runSpecTest();
+
+      boolean succeeded = job.waitForCompletion(true);
+      Assert.assertTrue("Job expected to succeed with estimator "
+          + estimatorClass.getName(), succeeded);
+      Assert.assertEquals("Job expected to succeed with estimator "
+              + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
+          job.getJobState());
+      Counters counters = job.getCounters();
+
+      String errorMessage = specEstimator.getErrorMessage(counters);
+      boolean didSpeculate = specEstimator.didSpeculate(counters);
+      Assert.assertEquals(errorMessage, didSpeculate,
+          specEstimator.speculativeEstimator);
+      Assert.assertEquals("Failed maps higher than 0 "
+              + estimatorClass.getName(), 0,
+          counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+              .getValue());
+    }
+  }
+
+  @Test
+  public void testExecNonSpeculative() throws Exception {
+    /*------------------------------------------------------------------
+     * Test that Map/Red does not speculate because all tasks progress in the
+     *    same rate.
+     *
+     * Expected:
+     * A- SimpleExponentialTaskRuntimeEstimator: does not speculate
+     * B- LegacyTaskRuntimeEstimator: speculates
+     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
+     * -----------------------------------------------------------------
+     */
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    if (ignoredTests.contains(chosenSleepCalc)) {
+      return;
+    }
+
+    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true),
+        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, false),
+        new EstimatorMetricsPair(
+            ExponentiallySmoothedTaskRuntimeEstimator.class,
+            myNumMapper, myNumReduce, true)
+    };
+
+    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+        continue;
+      }
+      LOG.info("+++ No Speculation testing against "
+          + estimatorClass.getName() + " +++");
+      Job job = runSpecTest();
+
+      boolean succeeded = job.waitForCompletion(true);
+      Assert.assertTrue("Job expected to succeed with estimator "
+          + estimatorClass.getName(), succeeded);
+      Assert.assertEquals("Job expected to succeed with estimator "
+              + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
+          job.getJobState());
+      Counters counters = job.getCounters();
+
+      String errorMessage = specEstimator.getErrorMessage(counters);
+      boolean didSpeculate = specEstimator.didSpeculate(counters);
+      Assert.assertEquals(errorMessage, didSpeculate,
+          specEstimator.speculativeEstimator);
+    }
+  }
+
+  private Job runSpecTest()
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    Configuration conf = mrCluster.getConfig();
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, ENABLE_SPECULATIVE_MAP);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, ENABLE_SPECULATIVE_REDUCE);
+    conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+        estimatorClass,
+        TaskRuntimeEstimator.class);
+    conf.setLong(MAP_SLEEP_TIME, myMapSleepTime);
+    conf.setLong(REDUCE_SLEEP_TIME, myReduceSleepTime);
+    conf.setInt(MAP_SLEEP_COUNT, myMapSleepCount);
+    conf.setInt(REDUCE_SLEEP_COUNT, myReduceSleepCount);
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
+    conf.setInt(MRJobConfig.NUM_MAPS, myNumMapper);
+    conf.set(MAP_SLEEP_CALCULATOR_TYPE, chosenSleepCalc);
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(TestSpeculativeExecution.class);
+    job.setMapperClass(SpeculativeSleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(SpeculativeSleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SpeculativeSleepInputFormat.class);
+    job.setPartitionerClass(SpeculativeSleepJobPartitioner.class);
+    job.setNumReduceTasks(myNumReduce);
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    // Delete output directory if it exists.
+    try {
+      localFs.delete(TEST_OUT_DIR, true);
+    } catch (IOException e) {
+      // ignore
+    }
+    FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
+
+    // Creates the Job Configuration
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.setMaxMapAttempts(2);
+
+    job.submit();
+
+    return job;
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index de171c7..940f142 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -18,12 +18,17 @@
 
 package org.apache.hadoop.mapreduce.v2;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -48,13 +53,31 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Test;
 
 import com.google.common.base.Supplier;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
+@RunWith(Parameterized.class)
 public class TestSpeculativeExecutionWithMRApp {
 
   private static final int NUM_MAPPERS = 5;
   private static final int NUM_REDUCERS = 0;
 
+  @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
+  public static Collection<Object[]> getTestParameters() {
+    return Arrays.asList(new Object[][] {
+        {SimpleExponentialTaskRuntimeEstimator.class},
+        {LegacyTaskRuntimeEstimator.class}
+    });
+  }
+
+  private Class<? extends TaskRuntimeEstimator> estimatorClass;
+
+  public TestSpeculativeExecutionWithMRApp(
+      Class<? extends TaskRuntimeEstimator>  estimatorKlass) {
+    this.estimatorClass = estimatorKlass;
+  }
+
   @Test
   public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
 
@@ -64,7 +87,7 @@ public class TestSpeculativeExecutionWithMRApp {
 
     MRApp app =
         new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
-    Job job = app.submit(new Configuration(), true, true);
+    Job job = app.submit(createConfiguration(), true, true);
     app.waitForState(job, JobState.RUNNING);
 
     Map<TaskId, Task> tasks = job.getTasks();
@@ -136,7 +159,7 @@ public class TestSpeculativeExecutionWithMRApp {
 
     MRApp app =
         new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
-    Job job = app.submit(new Configuration(), true, true);
+    Job job = app.submit(createConfiguration(), true, true);
     app.waitForState(job, JobState.RUNNING);
 
     Map<TaskId, Task> tasks = job.getTasks();
@@ -191,6 +214,9 @@ public class TestSpeculativeExecutionWithMRApp {
     }
 
     clock.setTime(System.currentTimeMillis() + 15000);
+    // give a chance to the speculator thread to run a scan before we proceed
+    // with updating events
+    Thread.yield();
     for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
       for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
         .getAttempts().entrySet()) {
@@ -251,4 +277,20 @@ public class TestSpeculativeExecutionWithMRApp {
     status.taskState = state;
     return status;
   }
+
+  private Configuration createConfiguration() {
+    Configuration conf = new Configuration();
+    conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+        estimatorClass,
+        TaskRuntimeEstimator.class);
+    if (SimpleExponentialTaskRuntimeEstimator.class.equals(estimatorClass)) {
+      // set configurations specific to SimpleExponential estimator
+      conf.setInt(
+          MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS, 1);
+      conf.setLong(
+          MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
+          1000L * 10);
+    }
+    return conf;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org