You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by je...@apache.org on 2019/11/05 21:48:40 UTC
[hadoop] branch branch-2 updated: MAPREDUCE-7208. Tuning
TaskRuntimeEstimator. (Ahmed Hussein via jeagles)
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new c59b1b6 MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles)
c59b1b6 is described below
commit c59b1b66a5264c3164ac6c9356b51de9b5349c66
Author: Ahmed Hussein <ah...@apache.org>
AuthorDate: Tue Nov 5 15:48:27 2019 -0600
MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles)
Signed-off-by: Jonathan Eagles <je...@gmail.com>
---
.../mapreduce/v2/app/speculate/DataStatistics.java | 18 +-
.../v2/app/speculate/DefaultSpeculator.java | 3 +-
.../v2/app/speculate/NullTaskRuntimesEngine.java | 5 +
.../SimpleExponentialTaskRuntimeEstimator.java | 170 ++++
.../v2/app/speculate/StartEndTimesBase.java | 8 +-
.../v2/app/speculate/TaskRuntimeEstimator.java | 13 +
.../forecast/SimpleExponentialSmoothing.java | 196 +++++
.../mapreduce/v2/app/TestRuntimeEstimators.java | 8 +
.../forecast/TestSimpleExponentialForecast.java | 120 +++
.../org/apache/hadoop/mapreduce/MRJobConfig.java | 31 +
.../mapreduce/v2/TestSpeculativeExecOnCluster.java | 935 +++++++++++++++++++++
.../v2/TestSpeculativeExecutionWithMRApp.java | 46 +-
12 files changed, 1546 insertions(+), 7 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
index cfaffaf..9f1c122 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
@@ -71,8 +71,22 @@ public class DataStatistics {
return count;
}
+ /**
+ * calculates the mean value within 95% ConfidenceInterval.
+ * 1.96 is standard for 95 %
+ *
+ * @return the mean value adding 95% confidence interval
+ */
+ public synchronized double meanCI() {
+ if (count <= 1) return 0.0;
+ double currMean = mean();
+ double currStd = std();
+ return currMean + (1.96 * currStd / Math.sqrt(count));
+ }
+
public String toString() {
- return "DataStatistics: count is " + count + ", sum is " + sum +
- ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+ return "DataStatistics: count is " + count + ", sum is " + sum
+ + ", sumSquares is " + sumSquares + " mean is " + mean()
+ + " std() is " + std() + ", meanCI() is " + meanCI();
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
index c6bc441..4ab8f71 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
@@ -414,7 +414,8 @@ public class DefaultSpeculator extends AbstractService implements
if (estimatedRunTime == data.getEstimatedRunTime()
&& progress == data.getProgress()) {
// Previous stats are same as same stats
- if (data.notHeartbeatedInAWhile(now)) {
+ if (data.notHeartbeatedInAWhile(now)
+ || estimator.hasStagnatedProgress(runningTaskAttemptID, now)) {
// Stats have stagnated for a while, simulate heart-beat.
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = runningTaskAttemptID;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
index 7211ff4..eb52bc7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
@@ -69,4 +69,9 @@ public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
return -1L;
}
+ @Override
+ public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
+ return false;
+ }
+
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java
new file mode 100644
index 0000000..f244b20
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.speculate.forecast.SimpleExponentialSmoothing;
+
+/**
+ * A task Runtime Estimator based on exponential smoothing.
+ */
+public class SimpleExponentialTaskRuntimeEstimator extends StartEndTimesBase {
+ private final static long DEFAULT_ESTIMATE_RUNTIME = -1L;
+
+ /**
+ * Constant time used to calculate the smoothing exponential factor.
+ */
+ private long constTime;
+
+ /**
+ * Number of readings before we consider the estimate stable.
+ * Otherwise, the estimate will be skewed due to the initial estimate
+ */
+ private int skipCount;
+
+ /**
+ * Time window to automatically update the count of the skipCount. This is
+ * needed when a task stalls without any progress, causing the estimator to
+ * return -1 as an estimatedRuntime.
+ */
+ private long stagnatedWindow;
+
+ private final ConcurrentMap<TaskAttemptId,
+ AtomicReference<SimpleExponentialSmoothing>>
+ estimates = new ConcurrentHashMap<>();
+
+ private SimpleExponentialSmoothing getForecastEntry(TaskAttemptId attemptID) {
+ AtomicReference<SimpleExponentialSmoothing> entryRef = estimates
+ .get(attemptID);
+ if (entryRef == null) {
+ return null;
+ }
+ return entryRef.get();
+ }
+
+ private void incorporateReading(TaskAttemptId attemptID,
+ float newRawData, long newTimeStamp) {
+ SimpleExponentialSmoothing foreCastEntry = getForecastEntry(attemptID);
+ if (foreCastEntry == null) {
+ Long tStartTime = startTimes.get(attemptID);
+ // skip if the startTime is not set yet
+ if(tStartTime == null) {
+ return;
+ }
+ estimates.putIfAbsent(attemptID,
+ new AtomicReference<>(SimpleExponentialSmoothing.createForecast(
+ constTime, skipCount, stagnatedWindow,
+ tStartTime)));
+ incorporateReading(attemptID, newRawData, newTimeStamp);
+ return;
+ }
+ foreCastEntry.incorporateReading(newTimeStamp, newRawData);
+ }
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ super.contextualize(conf, context);
+
+ constTime
+ = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
+ MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS);
+
+ stagnatedWindow = Math.max(2 * constTime, conf.getLong(
+ MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS,
+ MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS));
+
+ skipCount = conf
+ .getInt(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS,
+ MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS);
+ }
+
+ @Override
+ public long estimatedRuntime(TaskAttemptId id) {
+ SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id);
+ if (foreCastEntry == null) {
+ return DEFAULT_ESTIMATE_RUNTIME;
+ }
+ // TODO: What should we do when estimate is zero
+ double remainingWork = Math.min(1.0, 1.0 - foreCastEntry.getRawData());
+ double forecast = foreCastEntry.getForecast();
+ if (forecast <= 0.0) {
+ return DEFAULT_ESTIMATE_RUNTIME;
+ }
+ long remainingTime = (long)(remainingWork / forecast);
+ long estimatedRuntime = remainingTime
+ + foreCastEntry.getTimeStamp()
+ - foreCastEntry.getStartTime();
+ return estimatedRuntime;
+ }
+
+ @Override
+ public long estimatedNewAttemptRuntime(TaskId id) {
+ DataStatistics statistics = dataStatisticsForTask(id);
+
+ if (statistics == null) {
+ return -1L;
+ }
+
+ double statsMeanCI = statistics.meanCI();
+ double expectedVal =
+ statsMeanCI + Math.min(statsMeanCI * 0.25, statistics.std() / 2);
+ return (long)(expectedVal);
+ }
+
+ @Override
+ public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
+ SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id);
+ if(foreCastEntry == null) {
+ return false;
+ }
+ return foreCastEntry.isDataStagnated(timeStamp);
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TaskAttemptId id) {
+ SimpleExponentialSmoothing forecastEntry = getForecastEntry(id);
+ if (forecastEntry == null) {
+ return DEFAULT_ESTIMATE_RUNTIME;
+ }
+ double forecast = forecastEntry.getForecast();
+ if (forecastEntry.isDefaultForecast(forecast)) {
+ return DEFAULT_ESTIMATE_RUNTIME;
+ }
+ //TODO: What is the best way to measure variance in runtime
+ return 0L;
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ super.updateAttempt(status, timestamp);
+ TaskAttemptId attemptID = status.id;
+
+ float progress = status.progress;
+
+ incorporateReading(attemptID, progress, timestamp);
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java
index aee4821..0de2642 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java
@@ -152,8 +152,7 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator {
if (statistics == null) {
return -1L;
}
-
- return (long)statistics.mean();
+ return (long) statistics.mean();
}
@Override
@@ -208,4 +207,9 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator {
}
}
}
+
+ @Override
+ public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
+ return false;
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java
index ce4825f..9cadefe 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java
@@ -87,4 +87,17 @@ public interface TaskRuntimeEstimator {
*
*/
public long runtimeEstimateVariance(TaskAttemptId id);
+
+ /**
+ *
+ * Returns true if the estimator has no updates records for a threshold time
+ * window. This helps to identify task attempts that are stalled at the
+ * beginning of execution.
+ *
+ * @param id the {@link TaskAttemptId} of the attempt we are asking about
+ * @param timeStamp the time of the report we compare with
+ * @return true if the task attempt has no progress for a given time window
+ *
+ */
+ public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java
new file mode 100644
index 0000000..e1ef7be
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Implementation of the static model for Simple exponential smoothing.
+ */
+public class SimpleExponentialSmoothing {
+ public final static double DEFAULT_FORECAST = -1.0;
+ private final int kMinimumReads;
+ private final long kStagnatedWindow;
+ private final long startTime;
+ private long timeConstant;
+
+ private AtomicReference<ForecastRecord> forecastRefEntry;
+
+ public static SimpleExponentialSmoothing createForecast(long timeConstant,
+ int skipCnt, long stagnatedWindow, long timeStamp) {
+ return new SimpleExponentialSmoothing(timeConstant, skipCnt,
+ stagnatedWindow, timeStamp);
+ }
+
+ SimpleExponentialSmoothing(long ktConstant, int skipCnt,
+ long stagnatedWindow, long timeStamp) {
+ kMinimumReads = skipCnt;
+ kStagnatedWindow = stagnatedWindow;
+ this.timeConstant = ktConstant;
+ this.startTime = timeStamp;
+ this.forecastRefEntry = new AtomicReference<ForecastRecord>(null);
+ }
+
+ private class ForecastRecord {
+ private double alpha;
+ private long timeStamp;
+ private double sample;
+ private double rawData;
+ private double forecast;
+ private double sseError;
+ private long myIndex;
+
+ ForecastRecord(double forecast, double rawData, long timeStamp) {
+ this(0.0, forecast, rawData, forecast, timeStamp, 0.0, 0);
+ }
+
+ ForecastRecord(double alpha, double sample, double rawData,
+ double forecast, long timeStamp, double accError, long index) {
+ this.timeStamp = timeStamp;
+ this.alpha = alpha;
+ this.sseError = 0.0;
+ this.sample = sample;
+ this.forecast = forecast;
+ this.rawData = rawData;
+ this.sseError = accError;
+ this.myIndex = index;
+ }
+
+ private double preProcessRawData(double rData, long newTime) {
+ return processRawData(this.rawData, this.timeStamp, rData, newTime);
+ }
+
+ public ForecastRecord append(long newTimeStamp, double rData) {
+ if (this.timeStamp > newTimeStamp) {
+ return this;
+ }
+ double newSample = preProcessRawData(rData, newTimeStamp);
+ long deltaTime = this.timeStamp - newTimeStamp;
+ if (this.myIndex == kMinimumReads) {
+ timeConstant = Math.max(timeConstant, newTimeStamp - startTime);
+ }
+ double smoothFactor =
+ 1 - Math.exp(((double) deltaTime) / timeConstant);
+ double forecastVal =
+ smoothFactor * newSample + (1.0 - smoothFactor) * this.forecast;
+ double newSSEError =
+ this.sseError + Math.pow(newSample - this.forecast, 2);
+ return new ForecastRecord(smoothFactor, newSample, rData, forecastVal,
+ newTimeStamp, newSSEError, this.myIndex + 1);
+ }
+
+ }
+
+ public boolean isDataStagnated(long timeStamp) {
+ ForecastRecord rec = forecastRefEntry.get();
+ if (rec != null && rec.myIndex <= kMinimumReads) {
+ return (rec.timeStamp + kStagnatedWindow) < timeStamp;
+ }
+ return false;
+ }
+
+ static double processRawData(double oldRawData, long oldTime,
+ double newRawData, long newTime) {
+ double rate = (newRawData - oldRawData) / (newTime - oldTime);
+ return rate;
+ }
+
+ public void incorporateReading(long timeStamp, double rawData) {
+ ForecastRecord oldRec = forecastRefEntry.get();
+ if (oldRec == null) {
+ double oldForecast =
+ processRawData(0, startTime, rawData, timeStamp);
+ forecastRefEntry.compareAndSet(null,
+ new ForecastRecord(oldForecast, 0.0, startTime));
+ incorporateReading(timeStamp, rawData);
+ return;
+ }
+ while (!forecastRefEntry.compareAndSet(oldRec, oldRec.append(timeStamp,
+ rawData))) {
+ oldRec = forecastRefEntry.get();
+ }
+
+ }
+
+ public double getForecast() {
+ ForecastRecord rec = forecastRefEntry.get();
+ if (rec != null && rec.myIndex > kMinimumReads) {
+ return rec.forecast;
+ }
+ return DEFAULT_FORECAST;
+ }
+
+ public boolean isDefaultForecast(double value) {
+ return value == DEFAULT_FORECAST;
+ }
+
+ public double getSSE() {
+ ForecastRecord rec = forecastRefEntry.get();
+ if (rec != null) {
+ return rec.sseError;
+ }
+ return DEFAULT_FORECAST;
+ }
+
+ public boolean isErrorWithinBound(double bound) {
+ double squaredErr = getSSE();
+ if (squaredErr < 0) {
+ return false;
+ }
+ return bound > squaredErr;
+ }
+
+ public double getRawData() {
+ ForecastRecord rec = forecastRefEntry.get();
+ if (rec != null) {
+ return rec.rawData;
+ }
+ return DEFAULT_FORECAST;
+ }
+
+ public long getTimeStamp() {
+ ForecastRecord rec = forecastRefEntry.get();
+ if (rec != null) {
+ return rec.timeStamp;
+ }
+ return 0L;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public AtomicReference<ForecastRecord> getForecastRefEntry() {
+ return forecastRefEntry;
+ }
+
+ @Override
+ public String toString() {
+ String res = "NULL";
+ ForecastRecord rec = forecastRefEntry.get();
+ if (rec != null) {
+ res = "rec.index = " + rec.myIndex + ", forecast t: " + rec.timeStamp +
+ ", forecast: " + rec.forecast
+ + ", sample: " + rec.sample + ", raw: " + rec.rawData + ", error: "
+ + rec.sseError + ", alpha: " + rec.alpha;
+ }
+ return res;
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index 3c9e05d..9b26dbc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
@@ -255,6 +256,13 @@ public class TestRuntimeEstimators {
coreTestEstimator(specificEstimator, 3);
}
+ @Test
+ public void testSimpleExponentialEstimator() throws Exception {
+ TaskRuntimeEstimator specificEstimator
+ = new SimpleExponentialTaskRuntimeEstimator();
+ coreTestEstimator(specificEstimator, 3);
+ }
+
int taskTypeSlots(TaskType type) {
return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java
new file mode 100644
index 0000000..b669df7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testing the statistical model of simple exponential estimator.
+ */
+public class TestSimpleExponentialForecast {
+ private static final Log LOG =
+ LogFactory.getLog(TestSimpleExponentialForecast.class);
+
+ private static long clockTicks = 1000L;
+ private ControlledClock clock;
+
+ private int incTestSimpleExponentialForecast() {
+ clock = new ControlledClock();
+ clock.tickMsec(clockTicks);
+ SimpleExponentialSmoothing forecaster =
+ new SimpleExponentialSmoothing(10000,
+ 12, 10000, clock.getTime());
+
+
+ double progress = 0.0;
+
+ while(progress <= 1.0) {
+ clock.tickMsec(clockTicks);
+ forecaster.incorporateReading(clock.getTime(), progress);
+ LOG.info("progress: " + progress + " --> " + forecaster.toString());
+ progress += 0.005;
+ }
+
+ return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
+ }
+
+
+ private int decTestSimpleExponentialForecast() {
+ clock = new ControlledClock();
+ clock.tickMsec(clockTicks);
+ SimpleExponentialSmoothing forecaster =
+ new SimpleExponentialSmoothing(800,
+ 12, 10000, clock.getTime());
+
+ double progress = 0.0;
+
+ double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.001};
+ while(progress <= 1.0) {
+ clock.tickMsec(clockTicks);
+ forecaster.incorporateReading(clock.getTime(), progress);
+ LOG.info("progress: " + progress + " --> " + forecaster.toString());
+ progress += progressRates[(int)(progress / 0.25)];
+ }
+
+ return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
+ }
+
+ private int zeroTestSimpleExponentialForecast() {
+ clock = new ControlledClock();
+ clock.tickMsec(clockTicks);
+ SimpleExponentialSmoothing forecaster =
+ new SimpleExponentialSmoothing(800,
+ 12, 10000, clock.getTime());
+
+ double progress = 0.0;
+
+ double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.0, 0.003};
+ int progressInd = 0;
+ while(progress <= 1.0) {
+ clock.tickMsec(clockTicks);
+ forecaster.incorporateReading(clock.getTime(), progress);
+ LOG.info("progress: " + progress + " --> " + forecaster.toString());
+ int currInd = progressInd++ > 1000 ? 4 : (int)(progress / 0.25);
+ progress += progressRates[currInd];
+ }
+
+ return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
+ }
+
+ @Test
+ public void testSimpleExponentialForecastLinearInc() throws Exception {
+ int res = incTestSimpleExponentialForecast();
+ Assert.assertEquals("We got the wrong estimate from simple exponential.",
+ res, 0);
+ }
+
+ @Test
+ public void testSimpleExponentialForecastLinearDec() throws Exception {
+ int res = decTestSimpleExponentialForecast();
+ Assert.assertEquals("We got the wrong estimate from simple exponential.",
+ res, 0);
+ }
+
+ @Test
+ public void testSimpleExponentialForecastZeros() throws Exception {
+ int res = zeroTestSimpleExponentialForecast();
+ Assert.assertEquals("We got the wrong estimate from simple exponential.",
+ res, 0);
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 5a72def..37be797 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -795,6 +795,37 @@ public interface MRJobConfig {
public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
+ /** The lambda value in the smoothing function of the task estimator.*/
+ String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS =
+ MR_AM_PREFIX
+ + "job.task.estimator.simple.exponential.smooth.lambda-ms";
+ long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS = 1000L * 120;
+
+ /**
+ * The window length in the simple exponential smoothing that considers the
+ * task attempt is stagnated.
+ */
+ String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS =
+ MR_AM_PREFIX
+ + "job.task.estimator.simple.exponential.smooth.stagnated-ms";
+ long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS =
+ 1000L * 360;
+
+ /**
+ * The number of initial readings that the estimator ignores before giving a
+ * prediction. At the beginning the smooth estimator won't be accurate in
+ * prediction.
+ */
+ String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS =
+ MR_AM_PREFIX
+ + "job.task.estimator.simple.exponential.smooth.skip-initials";
+
+ /**
+ * The default number of reading the estimators is going to ignore before
+ * returning the smooth exponential prediction.
+ */
+ int DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS = 24;
+
/** The number of threads used to handle task RPC calls.*/
public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
MR_AM_PREFIX + "job.task.listener.thread-count";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java
new file mode 100644
index 0000000..02e4358
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java
@@ -0,0 +1,935 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test speculation on Mini Cluster.
+ */
+@Ignore
+@RunWith(Parameterized.class)
+public class TestSpeculativeExecOnCluster {
+ private static final Log LOG = LogFactory
+ .getLog(TestSpeculativeExecOnCluster.class);
+
+ private static final int NODE_MANAGERS_COUNT = 2;
+ private static final boolean ENABLE_SPECULATIVE_MAP = true;
+ private static final boolean ENABLE_SPECULATIVE_REDUCE = true;
+
+ private static final int NUM_MAP_DEFAULT = 8 * NODE_MANAGERS_COUNT;
+ private static final int NUM_REDUCE_DEFAULT = NUM_MAP_DEFAULT / 2;
+ private static final int MAP_SLEEP_TIME_DEFAULT = 60000;
+ private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000;
+ private static final int MAP_SLEEP_COUNT_DEFAULT = 10000;
+ private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000;
+
+ private static final String MAP_SLEEP_COUNT =
+ "mapreduce.sleepjob.map.sleep.count";
+ private static final String REDUCE_SLEEP_COUNT =
+ "mapreduce.sleepjob.reduce.sleep.count";
+ private static final String MAP_SLEEP_TIME =
+ "mapreduce.sleepjob.map.sleep.time";
+ private static final String REDUCE_SLEEP_TIME =
+ "mapreduce.sleepjob.reduce.sleep.time";
+ private static final String MAP_SLEEP_CALCULATOR_TYPE =
+ "mapreduce.sleepjob.map.sleep.time.calculator";
+ private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run";
+
+ private static Map<String, SleepDurationCalculator> mapSleepTypeMapper;
+
+
+ private static FileSystem localFs;
+
+ static {
+ mapSleepTypeMapper = new HashMap<>();
+ mapSleepTypeMapper.put("normal_run", new SleepDurationCalcImpl());
+ mapSleepTypeMapper.put("stalled_run",
+ new StalledSleepDurationCalcImpl());
+ mapSleepTypeMapper.put("slowing_run",
+ new SlowingSleepDurationCalcImpl());
+ mapSleepTypeMapper.put("dynamic_slowing_run",
+ new DynamicSleepDurationCalcImpl());
+ mapSleepTypeMapper.put("step_stalled_run",
+ new StepStalledSleepDurationCalcImpl());
+ try {
+ localFs = FileSystem.getLocal(new Configuration());
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ private static final Path TEST_ROOT_DIR =
+ new Path("target",
+ TestSpeculativeExecOnCluster.class.getName() + "-tmpDir")
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+ private static final Path TEST_OUT_DIR =
+ new Path(TEST_ROOT_DIR, "test.out.dir");
+
+ private MiniMRYarnCluster mrCluster;
+
+ private int myNumMapper;
+ private int myNumReduce;
+ private int myMapSleepTime;
+ private int myReduceSleepTime;
+ private int myMapSleepCount;
+ private int myReduceSleepCount;
+ private String chosenSleepCalc;
+ private Class<?> estimatorClass;
+
+
+ /**
+ * The test cases take a long time to run all the estimators against all the
+ * cases. We skip the legacy estimators to reduce the execution time.
+ */
+ private List<String> ignoredTests;
+
+
+ @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
+ public static Collection<Object[]> getTestParameters() {
+ List<String> ignoredTests = Arrays.asList(new String[] {
+ "stalled_run",
+ "slowing_run",
+ "step_stalled_run"
+ });
+ return Arrays.asList(new Object[][] {
+ {SimpleExponentialTaskRuntimeEstimator.class, ignoredTests,
+ NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT},
+ {LegacyTaskRuntimeEstimator.class, ignoredTests,
+ NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT}
+ });
+ }
+
+ public TestSpeculativeExecOnCluster(
+ Class<? extends TaskRuntimeEstimator> estimatorKlass,
+ List<String> testToIgnore,
+ Integer numMapper,
+ Integer numReduce) {
+ this.ignoredTests = testToIgnore;
+ this.estimatorClass = estimatorKlass;
+ this.myNumMapper = numMapper;
+ this.myNumReduce = numReduce;
+
+ }
+
+ @Before
+ public void setup() throws IOException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster == null) {
+ mrCluster = new MiniMRYarnCluster(
+ TestSpeculativeExecution.class.getName(), NODE_MANAGERS_COUNT);
+ Configuration conf = new Configuration();
+ mrCluster.init(conf);
+ mrCluster.start();
+
+ }
+
+ // workaround the absent public distcache.
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+ localFs.setPermission(APP_JAR, new FsPermission("700"));
+ myMapSleepTime = MAP_SLEEP_TIME_DEFAULT;
+ myReduceSleepTime = REDUCE_SLEEP_TIME_DEFAULT;
+ myMapSleepCount = MAP_SLEEP_COUNT_DEFAULT;
+ myReduceSleepCount = REDUCE_SLEEP_COUNT_DEFAULT;
+ chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT;
+ }
+
+ @After
+ public void tearDown() {
+ if (mrCluster != null) {
+ mrCluster.stop();
+ mrCluster = null;
+ }
+ }
+
+ /**
+ * Overrides default behavior of Partitioner for testing.
+ */
+ public static class SpeculativeSleepJobPartitioner extends
+ Partitioner<IntWritable, NullWritable> {
+ public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+ return k.get() % numPartitions;
+ }
+ }
+
+ /**
+ * Overrides default behavior of InputSplit for testing.
+ */
+ public static class EmptySplit extends InputSplit implements Writable {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() {
+ return 0L;
+ }
+ public String[] getLocations() {
+ return new String[0];
+ }
+ }
+
+ /**
+ * Input format that sleeps after updating progress.
+ */
+ public static class SpeculativeSleepInputFormat
+ extends InputFormat<IntWritable, IntWritable> {
+
+ public List<InputSplit> getSplits(JobContext jobContext) {
+ List<InputSplit> ret = new ArrayList<InputSplit>();
+ int numSplits = jobContext.getConfiguration().
+ getInt(MRJobConfig.NUM_MAPS, 1);
+ for (int i = 0; i < numSplits; ++i) {
+ ret.add(new EmptySplit());
+ }
+ return ret;
+ }
+
+ public RecordReader<IntWritable, IntWritable> createRecordReader(
+ InputSplit ignored, TaskAttemptContext taskContext)
+ throws IOException {
+ Configuration conf = taskContext.getConfiguration();
+ final int count = conf.getInt(MAP_SLEEP_COUNT, MAP_SLEEP_COUNT_DEFAULT);
+ if (count < 0) {
+ throw new IOException("Invalid map count: " + count);
+ }
+ final int redcount = conf.getInt(REDUCE_SLEEP_COUNT,
+ REDUCE_SLEEP_COUNT_DEFAULT);
+ if (redcount < 0) {
+ throw new IOException("Invalid reduce count: " + redcount);
+ }
+ final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+
+ return new RecordReader<IntWritable, IntWritable>() {
+ private int records = 0;
+ private int emitCount = 0;
+ private IntWritable key = null;
+ private IntWritable value = null;
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ }
+
+ public boolean nextKeyValue()
+ throws IOException {
+ if (count == 0) {
+ return false;
+ }
+ key = new IntWritable();
+ key.set(emitCount);
+ int emit = emitPerMapTask / count;
+ if ((emitPerMapTask) % count > records) {
+ ++emit;
+ }
+ emitCount += emit;
+ value = new IntWritable();
+ value.set(emit);
+ return records++ < count;
+ }
+ public IntWritable getCurrentKey() {
+ return key;
+ }
+ public IntWritable getCurrentValue() {
+ return value;
+ }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException {
+ return count == 0 ? 100 : records / ((float)count);
+ }
+ };
+ }
+ }
+
+ /**
+ * Interface used to simulate different progress rates of the tasks.
+ */
+ public interface SleepDurationCalculator {
+ long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount,
+ long defaultSleepDuration);
+ }
+
+ /**
+ * All tasks have the same progress.
+ */
+ public static class SleepDurationCalcImpl implements SleepDurationCalculator {
+
+ private double threshold = 1.0;
+ private double slowFactor = 1.0;
+
+ SleepDurationCalcImpl() {
+
+ }
+
+ public long calcSleepDuration(TaskAttemptID taId, int currCount,
+ int totalCount, long defaultSleepDuration) {
+ if (threshold <= ((double) currCount) / totalCount) {
+ return (long) (slowFactor * defaultSleepDuration);
+ }
+ return defaultSleepDuration;
+ }
+ }
+
+ /**
+ * The first attempt of task_0 slows down by a small factor that should not
+ * trigger a speculation. An speculated attempt should never beat the
+ * original task.
+ * A conservative estimator/speculator will speculate another attempt
+ * because of the slower progress.
+ */
+ public static class SlowingSleepDurationCalcImpl implements
+ SleepDurationCalculator {
+
+ private double threshold = 0.4;
+ private double slowFactor = 1.2;
+
+ SlowingSleepDurationCalcImpl() {
+
+ }
+
+ public long calcSleepDuration(TaskAttemptID taId, int currCount,
+ int totalCount, long defaultSleepDuration) {
+ if ((taId.getTaskType() == TaskType.MAP)
+ && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+ if (threshold <= ((double) currCount) / totalCount) {
+ return (long) (slowFactor * defaultSleepDuration);
+ }
+ }
+ return defaultSleepDuration;
+ }
+ }
+
+ /**
+ * The progress of the first Mapper task is stalled by 100 times the other
+ * tasks.
+ * The speculated attempt should be succeed if the estimator detects
+ * the slow down on time.
+ */
+ public static class StalledSleepDurationCalcImpl implements
+ SleepDurationCalculator {
+
+ StalledSleepDurationCalcImpl() {
+
+ }
+
+ public long calcSleepDuration(TaskAttemptID taId, int currCount,
+ int totalCount, long defaultSleepDuration) {
+ if ((taId.getTaskType() == TaskType.MAP)
+ && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+ return 1000 * defaultSleepDuration;
+ }
+ return defaultSleepDuration;
+ }
+ }
+
+
+ /**
+ * Emulates the behavior with a step change in the progress.
+ */
+ public static class StepStalledSleepDurationCalcImpl implements
+ SleepDurationCalculator {
+
+ private double threshold = 0.4;
+ private double slowFactor = 10000;
+
+ StepStalledSleepDurationCalcImpl() {
+
+ }
+
+ public long calcSleepDuration(TaskAttemptID taId, int currCount,
+ int totalCount, long defaultSleepDuration) {
+ if ((taId.getTaskType() == TaskType.MAP)
+ && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+ if (threshold <= ((double) currCount) / totalCount) {
+ return (long) (slowFactor * defaultSleepDuration);
+ }
+ }
+ return defaultSleepDuration;
+ }
+ }
+
+ /**
+ * Dynamically slows down the progress of the first Mapper task.
+ * The speculated attempt should be succeed if the estimator detects
+ * the slow down on time.
+ */
+ public static class DynamicSleepDurationCalcImpl implements
+ SleepDurationCalculator {
+
+ private double[] thresholds;
+ private double[] slowFactors;
+
+ DynamicSleepDurationCalcImpl() {
+ thresholds = new double[] {
+ 0.1, 0.25, 0.4, 0.5, 0.6, 0.65, 0.7, 0.8, 0.9
+ };
+ slowFactors = new double[] {
+ 2.0, 4.0, 5.0, 6.0, 10.0, 15.0, 20.0, 25.0, 30.0
+ };
+ }
+
+ public long calcSleepDuration(TaskAttemptID taId, int currCount,
+ int totalCount,
+ long defaultSleepDuration) {
+ if ((taId.getTaskType() == TaskType.MAP)
+ && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
+ double currProgress = ((double) currCount) / totalCount;
+ double slowFactor = 1.0;
+ for (int i = 0; i < thresholds.length; i++) {
+ if (thresholds[i] >= currProgress) {
+ break;
+ }
+ slowFactor = slowFactors[i];
+ }
+ return (long) (slowFactor * defaultSleepDuration);
+ }
+ return defaultSleepDuration;
+ }
+ }
+
+ /**
+ * Dummy class for testing Speculation. Sleeps for a defined period
+ * of time in mapper. Generates fake input for map / reduce
+ * jobs. Note that generated number of input pairs is in the order
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ * The sleep duration for a given task is going to slowDown to evaluate
+ * the estimator
+ */
+ public static class SpeculativeSleepMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ private long mapSleepDuration = MAP_SLEEP_TIME_DEFAULT;
+ private int mapSleepCount = 1;
+ private int count = 0;
+ private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl();
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.mapSleepCount =
+ conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+ this.mapSleepDuration = mapSleepCount == 0 ? 0 :
+ conf.getLong(MAP_SLEEP_TIME, MAP_SLEEP_TIME_DEFAULT) / mapSleepCount;
+ this.sleepCalc =
+ mapSleepTypeMapper.get(conf.get(MAP_SLEEP_CALCULATOR_TYPE,
+ MAP_SLEEP_CALCULATOR_TYPE_DEFAULT));
+
+ }
+
+ public void map(IntWritable key, IntWritable value, Context context)
+ throws IOException, InterruptedException {
+ //it is expected that every map processes mapSleepCount number of records.
+ try {
+ context.setStatus("Sleeping... (" +
+ (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+ long sleepTime = sleepCalc.calcSleepDuration(context.getTaskAttemptID(),
+ count, mapSleepCount,
+ mapSleepDuration);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ex) {
+ throw (IOException) new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ ++count;
+ // output reduceSleepCount * numReduce number of random values, so that
+ // each reducer will get reduceSleepCount number of keys.
+ int k = key.get();
+ for (int i = 0; i < value.get(); ++i) {
+ context.write(new IntWritable(k + i), NullWritable.get());
+ }
+ }
+ }
+
+ /**
+ * Implementation of the reducer task for testing.
+ */
+ public static class SpeculativeSleepReducer
+ extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+
+ private long reduceSleepDuration = REDUCE_SLEEP_TIME_DEFAULT;
+ private int reduceSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.reduceSleepCount =
+ conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+ this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
+ conf.getLong(REDUCE_SLEEP_TIME, REDUCE_SLEEP_TIME_DEFAULT)
+ / reduceSleepCount;
+ }
+
+ public void reduce(IntWritable key, Iterable<NullWritable> values,
+ Context context)
+ throws IOException {
+ try {
+ context.setStatus("Sleeping... (" +
+ (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+ Thread.sleep(reduceSleepDuration);
+ } catch (InterruptedException ex) {
+ throw (IOException) new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ count++;
+ }
+ }
+
+ /**
+ * A class used to map the estimatopr implementation to the expected
+ * test results.
+ */
+ class EstimatorMetricsPair {
+
+ private Class<?> estimatorClass;
+ private int expectedMapTasks;
+ private int expectedReduceTasks;
+ private boolean speculativeEstimator;
+
+ EstimatorMetricsPair(Class<?> estimatorClass, int mapTasks, int reduceTasks,
+ boolean isToSpeculate) {
+ this.estimatorClass = estimatorClass;
+ this.expectedMapTasks = mapTasks;
+ this.expectedReduceTasks = reduceTasks;
+ this.speculativeEstimator = isToSpeculate;
+ }
+
+ boolean didSpeculate(Counters counters) {
+ long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue();
+ long launchedReduce = counters
+ .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+ .getValue();
+ boolean isSpeculated =
+ (launchedMaps > expectedMapTasks
+ || launchedReduce > expectedReduceTasks);
+ return isSpeculated;
+ }
+
+ String getErrorMessage(Counters counters) {
+ String msg = "Unexpected tasks running estimator "
+ + estimatorClass.getName() + "\n\t";
+ long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue();
+ long launchedReduce = counters
+ .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+ .getValue();
+ if (speculativeEstimator) {
+ if (launchedMaps < expectedMapTasks) {
+ msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
+ }
+ if (launchedReduce < expectedReduceTasks) {
+ msg += ", reduces " + launchedReduce + ", expected: "
+ + expectedReduceTasks;
+ }
+ } else {
+ if (launchedMaps > expectedMapTasks) {
+ msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
+ }
+ if (launchedReduce > expectedReduceTasks) {
+ msg += ", reduces " + launchedReduce + ", expected: "
+ + expectedReduceTasks;
+ }
+ }
+ return msg;
+ }
+ }
+
+ @Test
+ public void testExecDynamicSlowingSpeculative() throws Exception {
+ /*------------------------------------------------------------------
+ * Test that Map/Red speculates because:
+ * 1- all tasks have same progress rate except for task_0
+ * 2- task_0 slows down by dynamic increasing factor
+ * 3- A good estimator should readjust the estimation and the speculator
+ * launches a new task.
+ *
+ * Expected:
+ * A- SimpleExponentialTaskRuntimeEstimator: speculates a successful
+ * attempt to beat the slowing task_0
+ * B- LegacyTaskRuntimeEstimator: speculates an attempt
+ * C- ExponentiallySmoothedTaskRuntimeEstimator: Fails to detect the slow
+ * down and never speculates but it may speculate other tasks
+ * (mappers or reducers)
+ * -----------------------------------------------------------------
+ */
+ chosenSleepCalc = "dynamic_slowing_run";
+
+ if (ignoredTests.contains(chosenSleepCalc)) {
+ return;
+ }
+
+ EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+ new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(
+ ExponentiallySmoothedTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true)
+ };
+
+ for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+ if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+ continue;
+ }
+ LOG.info("+++ Dynamic Slow Progress testing against " + estimatorClass
+ .getName() + " +++");
+ Job job = runSpecTest();
+
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(
+ "Job expected to succeed with estimator " + estimatorClass.getName(),
+ succeeded);
+ Assert.assertEquals(
+ "Job expected to succeed with estimator " + estimatorClass.getName(),
+ JobStatus.State.SUCCEEDED, job.getJobState());
+ Counters counters = job.getCounters();
+
+ String errorMessage = specEstimator.getErrorMessage(counters);
+ boolean didSpeculate = specEstimator.didSpeculate(counters);
+ Assert.assertEquals(errorMessage, didSpeculate,
+ specEstimator.speculativeEstimator);
+ Assert
+ .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
+ 0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
+ }
+ }
+
+
+ @Test
+ public void testExecSlowNonSpeculative() throws Exception {
+ /*------------------------------------------------------------------
+ * Test that Map/Red does not speculate because:
+ * 1- all tasks have same progress rate except for task_0
+ * 2- task_0 slows down by 0.5 after 50% of the workload
+ * 3- A good estimator may adjust the estimation that the task will finish
+ * sooner than a new speculated task.
+ *
+ * Expected:
+ * A- SimpleExponentialTaskRuntimeEstimator: does not speculate because
+ * the new attempt estimated end time is not going to be smaller than the
+ * original end time.
+ * B- LegacyTaskRuntimeEstimator: speculates an attempt
+ * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates an attempt.
+ * -----------------------------------------------------------------
+ */
+ chosenSleepCalc = "slowing_run";
+
+ if (ignoredTests.contains(chosenSleepCalc)) {
+ return;
+ }
+
+ EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+ new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, false),
+ new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(
+ ExponentiallySmoothedTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true)
+ };
+
+ for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+ if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+ continue;
+ }
+ LOG.info("+++ Linear Slow Progress Non Speculative testing against "
+ + estimatorClass.getName() + " +++");
+ Job job = runSpecTest();
+
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(
+ "Job expected to succeed with estimator " + estimatorClass.getName(),
+ succeeded);
+ Assert.assertEquals(
+ "Job expected to succeed with estimator " + estimatorClass.getName(),
+ JobStatus.State.SUCCEEDED, job.getJobState());
+ Counters counters = job.getCounters();
+
+ String errorMessage = specEstimator.getErrorMessage(counters);
+ boolean didSpeculate = specEstimator.didSpeculate(counters);
+ Assert.assertEquals(errorMessage, didSpeculate,
+ specEstimator.speculativeEstimator);
+ Assert
+ .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
+ 0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
+ }
+ }
+
+ @Test
+ public void testExecStepStalledSpeculative() throws Exception {
+ /*------------------------------------------------------------------
+ * Test that Map/Red speculates because:
+ * 1- all tasks have same progress rate except for task_0
+ * 2- task_0 has long sleep duration
+ * 3- A good estimator may adjust the estimation that the task will finish
+ * sooner than a new speculated task.
+ *
+ * Expected:
+ * A- SimpleExponentialTaskRuntimeEstimator: speculates
+ * B- LegacyTaskRuntimeEstimator: speculates
+ * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
+ * -----------------------------------------------------------------
+ */
+ chosenSleepCalc = "step_stalled_run";
+ if (ignoredTests.contains(chosenSleepCalc)) {
+ return;
+ }
+ EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+ new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(
+ ExponentiallySmoothedTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true)
+ };
+
+ for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+ if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+ continue;
+ }
+ LOG.info("+++ Stalled Progress testing against "
+ + estimatorClass.getName() + " +++");
+ Job job = runSpecTest();
+
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue("Job expected to succeed with estimator "
+ + estimatorClass.getName(), succeeded);
+ Assert.assertEquals("Job expected to succeed with estimator "
+ + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
+ job.getJobState());
+ Counters counters = job.getCounters();
+
+ String errorMessage = specEstimator.getErrorMessage(counters);
+ boolean didSpeculate = specEstimator.didSpeculate(counters);
+ Assert.assertEquals(errorMessage, didSpeculate,
+ specEstimator.speculativeEstimator);
+ Assert.assertEquals("Failed maps higher than 0 "
+ + estimatorClass.getName(), 0,
+ counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+ .getValue());
+ }
+ }
+
+ @Test
+ public void testExecStalledSpeculative() throws Exception {
+ /*------------------------------------------------------------------
+ * Test that Map/Red speculates because:
+ * 1- all tasks have same progress rate except for task_0
+ * 2- task_0 has long sleep duration
+ * 3- A good estimator may adjust the estimation that the task will finish
+ * sooner than a new speculated task.
+ *
+ * Expected:
+ * A- SimpleExponentialTaskRuntimeEstimator: speculates
+ * B- LegacyTaskRuntimeEstimator: speculates
+ * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
+ * -----------------------------------------------------------------
+ */
+ chosenSleepCalc = "stalled_run";
+
+ if (ignoredTests.contains(chosenSleepCalc)) {
+ return;
+ }
+ EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+ new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(
+ ExponentiallySmoothedTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true)
+ };
+
+ for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+ if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+ continue;
+ }
+ LOG.info("+++ Stalled Progress testing against "
+ + estimatorClass.getName() + " +++");
+ Job job = runSpecTest();
+
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue("Job expected to succeed with estimator "
+ + estimatorClass.getName(), succeeded);
+ Assert.assertEquals("Job expected to succeed with estimator "
+ + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
+ job.getJobState());
+ Counters counters = job.getCounters();
+
+ String errorMessage = specEstimator.getErrorMessage(counters);
+ boolean didSpeculate = specEstimator.didSpeculate(counters);
+ Assert.assertEquals(errorMessage, didSpeculate,
+ specEstimator.speculativeEstimator);
+ Assert.assertEquals("Failed maps higher than 0 "
+ + estimatorClass.getName(), 0,
+ counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+ .getValue());
+ }
+ }
+
+ @Test
+ public void testExecNonSpeculative() throws Exception {
+ /*------------------------------------------------------------------
+ * Test that Map/Red does not speculate because all tasks progress in the
+ * same rate.
+ *
+ * Expected:
+ * A- SimpleExponentialTaskRuntimeEstimator: does not speculate
+ * B- LegacyTaskRuntimeEstimator: speculates
+ * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
+ * -----------------------------------------------------------------
+ */
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (ignoredTests.contains(chosenSleepCalc)) {
+ return;
+ }
+
+ EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
+ new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true),
+ new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, false),
+ new EstimatorMetricsPair(
+ ExponentiallySmoothedTaskRuntimeEstimator.class,
+ myNumMapper, myNumReduce, true)
+ };
+
+ for (EstimatorMetricsPair specEstimator : estimatorPairs) {
+ if (!estimatorClass.equals(specEstimator.estimatorClass)) {
+ continue;
+ }
+ LOG.info("+++ No Speculation testing against "
+ + estimatorClass.getName() + " +++");
+ Job job = runSpecTest();
+
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue("Job expected to succeed with estimator "
+ + estimatorClass.getName(), succeeded);
+ Assert.assertEquals("Job expected to succeed with estimator "
+ + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
+ job.getJobState());
+ Counters counters = job.getCounters();
+
+ String errorMessage = specEstimator.getErrorMessage(counters);
+ boolean didSpeculate = specEstimator.didSpeculate(counters);
+ Assert.assertEquals(errorMessage, didSpeculate,
+ specEstimator.speculativeEstimator);
+ }
+ }
+
+ private Job runSpecTest()
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ Configuration conf = mrCluster.getConfig();
+ conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, ENABLE_SPECULATIVE_MAP);
+ conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, ENABLE_SPECULATIVE_REDUCE);
+ conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+ estimatorClass,
+ TaskRuntimeEstimator.class);
+ conf.setLong(MAP_SLEEP_TIME, myMapSleepTime);
+ conf.setLong(REDUCE_SLEEP_TIME, myReduceSleepTime);
+ conf.setInt(MAP_SLEEP_COUNT, myMapSleepCount);
+ conf.setInt(REDUCE_SLEEP_COUNT, myReduceSleepCount);
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
+ conf.setInt(MRJobConfig.NUM_MAPS, myNumMapper);
+ conf.set(MAP_SLEEP_CALCULATOR_TYPE, chosenSleepCalc);
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(TestSpeculativeExecution.class);
+ job.setMapperClass(SpeculativeSleepMapper.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setReducerClass(SpeculativeSleepReducer.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setInputFormatClass(SpeculativeSleepInputFormat.class);
+ job.setPartitionerClass(SpeculativeSleepJobPartitioner.class);
+ job.setNumReduceTasks(myNumReduce);
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ // Delete output directory if it exists.
+ try {
+ localFs.delete(TEST_OUT_DIR, true);
+ } catch (IOException e) {
+ // ignore
+ }
+ FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
+
+ // Creates the Job Configuration
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.setMaxMapAttempts(2);
+
+ job.submit();
+
+ return job;
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index de171c7..940f142 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -18,12 +18,17 @@
package org.apache.hadoop.mapreduce.v2;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -48,13 +53,31 @@ import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;
import com.google.common.base.Supplier;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
@SuppressWarnings({ "unchecked", "rawtypes" })
+@RunWith(Parameterized.class)
public class TestSpeculativeExecutionWithMRApp {
private static final int NUM_MAPPERS = 5;
private static final int NUM_REDUCERS = 0;
+ @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
+ public static Collection<Object[]> getTestParameters() {
+ return Arrays.asList(new Object[][] {
+ {SimpleExponentialTaskRuntimeEstimator.class},
+ {LegacyTaskRuntimeEstimator.class}
+ });
+ }
+
+ private Class<? extends TaskRuntimeEstimator> estimatorClass;
+
+ public TestSpeculativeExecutionWithMRApp(
+ Class<? extends TaskRuntimeEstimator> estimatorKlass) {
+ this.estimatorClass = estimatorKlass;
+ }
+
@Test
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
@@ -64,7 +87,7 @@ public class TestSpeculativeExecutionWithMRApp {
MRApp app =
new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
- Job job = app.submit(new Configuration(), true, true);
+ Job job = app.submit(createConfiguration(), true, true);
app.waitForState(job, JobState.RUNNING);
Map<TaskId, Task> tasks = job.getTasks();
@@ -136,7 +159,7 @@ public class TestSpeculativeExecutionWithMRApp {
MRApp app =
new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
- Job job = app.submit(new Configuration(), true, true);
+ Job job = app.submit(createConfiguration(), true, true);
app.waitForState(job, JobState.RUNNING);
Map<TaskId, Task> tasks = job.getTasks();
@@ -191,6 +214,9 @@ public class TestSpeculativeExecutionWithMRApp {
}
clock.setTime(System.currentTimeMillis() + 15000);
+ // give a chance to the speculator thread to run a scan before we proceed
+ // with updating events
+ Thread.yield();
for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
.getAttempts().entrySet()) {
@@ -251,4 +277,20 @@ public class TestSpeculativeExecutionWithMRApp {
status.taskState = state;
return status;
}
+
+ private Configuration createConfiguration() {
+ Configuration conf = new Configuration();
+ conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+ estimatorClass,
+ TaskRuntimeEstimator.class);
+ if (SimpleExponentialTaskRuntimeEstimator.class.equals(estimatorClass)) {
+ // set configurations specific to SimpleExponential estimator
+ conf.setInt(
+ MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS, 1);
+ conf.setLong(
+ MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
+ 1000L * 10);
+ }
+ return conf;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org