You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/10/04 19:29:22 UTC

svn commit: r1529230 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Author: vinodkv
Date: Fri Oct  4 17:29:21 2013
New Revision: 1529230

URL: http://svn.apache.org/r1529230
Log:
MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that aren't heart-beating for a while, so that we can aggressively speculate instead of waiting for task-timeout. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1529229 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
      - copied unchanged from r1529229, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1529230&r1=1529229&r2=1529230&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Oct  4 17:29:21 2013
@@ -141,6 +141,10 @@ Release 2.1.2 - UNRELEASED
     MAPREDUCE-5442. $HADOOP_MAPRED_HOME/$HADOOP_CONF_DIR setting not working on
     Windows. (Yingda Chen via cnauroth)
 
+    MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that
+    aren't heart-beating for a while, so that we can aggressively speculate
+    instead of waiting for task-timeout (Xuan Gong via vinodkv)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1529230&r1=1529229&r2=1529230&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Fri Oct  4 17:29:21 2013
@@ -78,6 +78,16 @@ public class DefaultSpeculator extends A
   private final Map<Task, AtomicBoolean> pendingSpeculations
       = new ConcurrentHashMap<Task, AtomicBoolean>();
 
+  // Used to track any TaskAttempts that aren't heart-beating for a while, so
+  // that we can aggressively speculate instead of waiting for task-timeout.
+  private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics>
+      runningTaskAttemptStatistics = new ConcurrentHashMap<TaskAttemptId,
+          TaskAttemptHistoryStatistics>();
+  // Regular heartbeat from tasks is every 3 secs. So if we don't get a
+  // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
+  // in progress.
+  private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
+
   // These are the current needs, not the initial needs.  For each job, these
   //  record the number of attempts that exist and that are actively
   //  waiting for a container [as opposed to running or finished]
@@ -329,6 +339,9 @@ public class DefaultSpeculator extends A
       runningTasks.putIfAbsent(taskID, Boolean.TRUE);
     } else {
       runningTasks.remove(taskID, Boolean.TRUE);
+      if (!stateString.equals(TaskAttemptState.STARTING.name())) {
+        runningTaskAttemptStatistics.remove(attemptID);
+      }
     }
   }
 
@@ -389,6 +402,33 @@ public class DefaultSpeculator extends A
         long estimatedReplacementEndTime
             = now + estimator.estimatedNewAttemptRuntime(taskID);
 
+        float progress = taskAttempt.getProgress();
+        TaskAttemptHistoryStatistics data =
+            runningTaskAttemptStatistics.get(runningTaskAttemptID);
+        if (data == null) {
+          runningTaskAttemptStatistics.put(runningTaskAttemptID,
+            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
+        } else {
+          if (estimatedRunTime == data.getEstimatedRunTime()
+              && progress == data.getProgress()) {
+            // Previous stats are same as same stats
+            if (data.notHeartbeatedInAWhile(now)) {
+              // Stats have stagnated for a while, simulate heart-beat.
+              TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+              taskAttemptStatus.id = runningTaskAttemptID;
+              taskAttemptStatus.progress = progress;
+              taskAttemptStatus.taskState = taskAttempt.getState();
+              // Now simulate the heart-beat
+              handleAttempt(taskAttemptStatus);
+            }
+          } else {
+            // Stats have changed - update our data structure
+            data.setEstimatedRunTime(estimatedRunTime);
+            data.setProgress(progress);
+            data.resetHeartBeatTime(now);
+          }
+        }
+
         if (estimatedEndTime < now) {
           return PROGRESS_IS_GOOD;
         }
@@ -511,4 +551,47 @@ public class DefaultSpeculator extends A
     // We'll try to issue one map and one reduce speculation per job per run
     return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
   }
+
+  static class TaskAttemptHistoryStatistics {
+
+    private long estimatedRunTime;
+    private float progress;
+    private long lastHeartBeatTime;
+
+    public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
+        long nonProgressStartTime) {
+      this.estimatedRunTime = estimatedRunTime;
+      this.progress = progress;
+      resetHeartBeatTime(nonProgressStartTime);
+    }
+
+    public long getEstimatedRunTime() {
+      return this.estimatedRunTime;
+    }
+
+    public float getProgress() {
+      return this.progress;
+    }
+
+    public void setEstimatedRunTime(long estimatedRunTime) {
+      this.estimatedRunTime = estimatedRunTime;
+    }
+
+    public void setProgress(float progress) {
+      this.progress = progress;
+    }
+
+    public boolean notHeartbeatedInAWhile(long now) {
+      if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
+        return false;
+      } else {
+        resetHeartBeatTime(now);
+        return true;
+      }
+    }
+
+    public void resetHeartBeatTime(long lastHeartBeatTime) {
+      this.lastHeartBeatTime = lastHeartBeatTime;
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1529230&r1=1529229&r2=1529230&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Oct  4 17:29:21 2013
@@ -263,16 +263,22 @@ public class MRApp extends MRAppMaster {
   }
 
   public Job submit(Configuration conf) throws Exception {
+    //TODO: fix the bug where the speculator gets events with 
+    //not-fully-constructed objects. For now, disable speculative exec
+    return submit(conf, false, false);
+  }
+
+  public Job submit(Configuration conf, boolean mapSpeculative,
+      boolean reduceSpeculative) throws Exception {
     String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
-      .getCurrentUser().getShortUserName());
+        .getCurrentUser().getShortUserName());
     conf.set(MRJobConfig.USER_NAME, user);
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
     conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
-    //TODO: fix the bug where the speculator gets events with 
-    //not-fully-constructed objects. For now, disable speculative exec
-    LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
-    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
-    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    // TODO: fix the bug where the speculator gets events with
+    // not-fully-constructed objects. For now, disable speculative exec
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, mapSpeculative);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, reduceSpeculative);
 
     init(conf);
     start();
@@ -281,7 +287,7 @@ public class MRApp extends MRAppMaster {
 
     // Write job.xml
     String jobFile = MRApps.getJobFile(conf, user,
-      TypeConverter.fromYarn(job.getID()));
+        TypeConverter.fromYarn(job.getID()));
     LOG.info("Writing job conf to " + jobFile);
     new File(jobFile).getParentFile().mkdirs();
     conf.writeXml(new FileOutputStream(jobFile));