You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/10/04 19:29:22 UTC
svn commit: r1529230 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Author: vinodkv
Date: Fri Oct 4 17:29:21 2013
New Revision: 1529230
URL: http://svn.apache.org/r1529230
Log:
MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that aren't heart-beating for a while, so that we can aggressively speculate instead of waiting for task-timeout. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1529229 ../../trunk/
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
- copied unchanged from r1529229, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1529230&r1=1529229&r2=1529230&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Oct 4 17:29:21 2013
@@ -141,6 +141,10 @@ Release 2.1.2 - UNRELEASED
MAPREDUCE-5442. $HADOOP_MAPRED_HOME/$HADOOP_CONF_DIR setting not working on
Windows. (Yingda Chen via cnauroth)
+ MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that
+ aren't heart-beating for a while, so that we can aggressively speculate
+ instead of waiting for task-timeout (Xuan Gong via vinodkv)
+
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1529230&r1=1529229&r2=1529230&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Fri Oct 4 17:29:21 2013
@@ -78,6 +78,16 @@ public class DefaultSpeculator extends A
private final Map<Task, AtomicBoolean> pendingSpeculations
= new ConcurrentHashMap<Task, AtomicBoolean>();
+ // Used to track any TaskAttempts that aren't heart-beating for a while, so
+ // that we can aggressively speculate instead of waiting for task-timeout.
+ private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics>
+ runningTaskAttemptStatistics = new ConcurrentHashMap<TaskAttemptId,
+ TaskAttemptHistoryStatistics>();
+ // Regular heartbeat from tasks is every 3 secs. So if we don't get a
+ // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
+ // in progress.
+ private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
+
// These are the current needs, not the initial needs. For each job, these
// record the number of attempts that exist and that are actively
// waiting for a container [as opposed to running or finished]
@@ -329,6 +339,9 @@ public class DefaultSpeculator extends A
runningTasks.putIfAbsent(taskID, Boolean.TRUE);
} else {
runningTasks.remove(taskID, Boolean.TRUE);
+ if (!stateString.equals(TaskAttemptState.STARTING.name())) {
+ runningTaskAttemptStatistics.remove(attemptID);
+ }
}
}
@@ -389,6 +402,33 @@ public class DefaultSpeculator extends A
long estimatedReplacementEndTime
= now + estimator.estimatedNewAttemptRuntime(taskID);
+ float progress = taskAttempt.getProgress();
+ TaskAttemptHistoryStatistics data =
+ runningTaskAttemptStatistics.get(runningTaskAttemptID);
+ if (data == null) {
+ runningTaskAttemptStatistics.put(runningTaskAttemptID,
+ new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
+ } else {
+ if (estimatedRunTime == data.getEstimatedRunTime()
+ && progress == data.getProgress()) {
+ // Previous stats are same as same stats
+ if (data.notHeartbeatedInAWhile(now)) {
+ // Stats have stagnated for a while, simulate heart-beat.
+ TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+ taskAttemptStatus.id = runningTaskAttemptID;
+ taskAttemptStatus.progress = progress;
+ taskAttemptStatus.taskState = taskAttempt.getState();
+ // Now simulate the heart-beat
+ handleAttempt(taskAttemptStatus);
+ }
+ } else {
+ // Stats have changed - update our data structure
+ data.setEstimatedRunTime(estimatedRunTime);
+ data.setProgress(progress);
+ data.resetHeartBeatTime(now);
+ }
+ }
+
if (estimatedEndTime < now) {
return PROGRESS_IS_GOOD;
}
@@ -511,4 +551,47 @@ public class DefaultSpeculator extends A
// We'll try to issue one map and one reduce speculation per job per run
return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
}
+
+ static class TaskAttemptHistoryStatistics {
+
+ private long estimatedRunTime;
+ private float progress;
+ private long lastHeartBeatTime;
+
+ public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
+ long nonProgressStartTime) {
+ this.estimatedRunTime = estimatedRunTime;
+ this.progress = progress;
+ resetHeartBeatTime(nonProgressStartTime);
+ }
+
+ public long getEstimatedRunTime() {
+ return this.estimatedRunTime;
+ }
+
+ public float getProgress() {
+ return this.progress;
+ }
+
+ public void setEstimatedRunTime(long estimatedRunTime) {
+ this.estimatedRunTime = estimatedRunTime;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public boolean notHeartbeatedInAWhile(long now) {
+ if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
+ return false;
+ } else {
+ resetHeartBeatTime(now);
+ return true;
+ }
+ }
+
+ public void resetHeartBeatTime(long lastHeartBeatTime) {
+ this.lastHeartBeatTime = lastHeartBeatTime;
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1529230&r1=1529229&r2=1529230&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Oct 4 17:29:21 2013
@@ -263,16 +263,22 @@ public class MRApp extends MRAppMaster {
}
public Job submit(Configuration conf) throws Exception {
+ //TODO: fix the bug where the speculator gets events with
+ //not-fully-constructed objects. For now, disable speculative exec
+ return submit(conf, false, false);
+ }
+
+ public Job submit(Configuration conf, boolean mapSpeculative,
+ boolean reduceSpeculative) throws Exception {
String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
- .getCurrentUser().getShortUserName());
+ .getCurrentUser().getShortUserName());
conf.set(MRJobConfig.USER_NAME, user);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
- //TODO: fix the bug where the speculator gets events with
- //not-fully-constructed objects. For now, disable speculative exec
- LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
- conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
- conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ // TODO: fix the bug where the speculator gets events with
+ // not-fully-constructed objects. For now, disable speculative exec
+ conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, mapSpeculative);
+ conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, reduceSpeculative);
init(conf);
start();
@@ -281,7 +287,7 @@ public class MRApp extends MRAppMaster {
// Write job.xml
String jobFile = MRApps.getJobFile(conf, user,
- TypeConverter.fromYarn(job.getID()));
+ TypeConverter.fromYarn(job.getID()));
LOG.info("Writing job conf to " + jobFile);
new File(jobFile).getParentFile().mkdirs();
conf.writeXml(new FileOutputStream(jobFile));