You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/21 08:13:50 UTC

svn commit: r796154 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: ddas
Date: Tue Jul 21 06:13:50 2009
New Revision: 796154

URL: http://svn.apache.org/viewvc?rev=796154&view=rev
Log:
MAPREDUCE-717. Fixes some corner case issues in speculative execution heuristics. Contributed by Devaraj Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jul 21 06:13:50 2009
@@ -239,3 +239,6 @@
     MAPREDUCE-430. Fix bug related to Task getting stuck due to
     OutOfMemoryErrors. (Amar Kamat via sharad)
 
+    MAPREDUCE-717. Fixes some corner case issues in speculative 
+    execution heuristics. (Devaraj Das)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jul 21 06:13:50 2009
@@ -363,8 +363,10 @@
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid.toString());
-    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    if (!hasRestarted()) { //This is temporary until we fix the restart model
+      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    }
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
     this.anyCacheLevel = this.maxLevel+1;
     this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -1913,16 +1915,17 @@
    * @param list pool of tasks to choose from
    * @param taskTrackerName the name of the TaskTracker asking for a task
    * @param taskTrackerHost the hostname of the TaskTracker asking for a task
+   * @param taskType the type of task (MAP/REDUCE) that we are considering
    * @return the TIP to speculatively re-execute
    */
   protected synchronized TaskInProgress findSpeculativeTask(
       Collection<TaskInProgress> list, String taskTrackerName, 
-      String taskTrackerHost) {
+      String taskTrackerHost, TaskType taskType) {
     if (list.isEmpty()) {
       return null;
     }
     long now = JobTracker.getClock().getTime();
-    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) {
+    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
       return null;
     }
     // List of speculatable candidates, start with all, and chop it down
@@ -2141,7 +2144,7 @@
     
     ///////// Select a TIP to run on
     TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, 
-        taskTrackerHost);
+        taskTrackerHost, TaskType.MAP);
     
     if (tip != null) {
       LOG.info("Choosing map task " + tip.getTIPId() + 
@@ -2209,7 +2212,7 @@
   private synchronized TaskInProgress getSpeculativeReduce(
       String taskTrackerName, String taskTrackerHost) {
     TaskInProgress tip = findSpeculativeTask(
-        runningReduces, taskTrackerName, taskTrackerHost);
+        runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE);
     if (tip != null) {
       LOG.info("Choosing reduce task " + tip.getTIPId() + 
           " for speculative execution");
@@ -2223,31 +2226,32 @@
      * Check to see if the maximum number of speculative tasks are
      * already being executed currently.
      * @param tasks the set of tasks to test
+     * @param type the type of task (MAP/REDUCE) that we are considering
      * @return has the cap been reached?
      */
-   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks) {
+   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks, 
+       TaskType type) {
      float numTasks = tasks.size();
      if (numTasks == 0){
        return true; // avoid divide by zero
      }
-
+     int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks 
+         : speculativeReduceTasks;
      //return true if totalSpecTask < max(10, 0.01 * total-slots, 
      //                                   0.1 * total-running-tasks)
 
-     if (speculativeMapTasks + speculativeReduceTasks < MIN_SPEC_CAP) {
+     if (speculativeTaskCount < MIN_SPEC_CAP) {
        return false; // at least one slow tracker's worth of slots(default=10)
      }
      ClusterStatus c = jobtracker.getClusterStatus(false); 
-     int numSlots = c.getMaxMapTasks() + c.getMaxReduceTasks();
-     if ((float)(speculativeMapTasks + speculativeReduceTasks) < 
-       numSlots * MIN_SLOTS_CAP) {
+     int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks());
+     if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) {
        return false;
      }
-     boolean atCap = (((float)(speculativeMapTasks+
-         speculativeReduceTasks)/numTasks) >= speculativeCap);
+     boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap);
      if (LOG.isDebugEnabled()) {
        LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
-           ((float)(speculativeMapTasks+speculativeReduceTasks)/numTasks)+
+           ((float)(speculativeTaskCount)/numTasks)+
            ", so atSpecCap() is returning "+atCap);
      }
      return atCap;
@@ -2355,8 +2359,8 @@
     }
     private void sub(double oldNum) {
       this.count--;
-      this.sum -= oldNum;
-      this.sumSquares -= oldNum * oldNum;
+      this.sum = Math.max(this.sum -= oldNum, 0.0d);
+      this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
     }
     
     public double mean() {
@@ -2365,7 +2369,7 @@
   
     public double var() {
       // E(X^2) - E(X)^2
-      return (sumSquares/count) - mean() * mean();
+      return Math.max((sumSquares/count) - mean() * mean(), 0.0d);
     }
     
     public double std() {
@@ -2511,7 +2515,7 @@
       runningMapTasks -= 1;
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
-      if (hasSpeculativeMaps) {
+      if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
         updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
       }
       // remove the completed map from the resp running caches
@@ -2523,7 +2527,7 @@
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
-      if (hasSpeculativeReduces) {
+      if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
         updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
       }
       // remove the completed reduces from the running reducers set
@@ -2542,7 +2546,7 @@
   
   private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
       Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
-    float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime();
+    float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(tip.getSuccessfulTaskid());
     DataStatistics ttStats = 
       trackerStats.get(ttStatus.getTrackerName());
     double oldMean = 0.0d;
@@ -2773,12 +2777,14 @@
     if (wasSpeculating) {
       if (tip.isMapTask()) {
         speculativeMapTasks--;
-        LOG.debug("Decrement count. Current speculativeMap task count: " +
-            speculativeMapTasks);
+        LOG.debug("Decremented count for " + 
+            tip.getTIPId()+"/"+tip.getJob().getJobID() + 
+          ". Current speculativeMap task count: " + speculativeMapTasks);
       } else {
         speculativeReduceTasks--;
-        LOG.debug("Decremented count. Current speculativeReduce task count: " + 
-            speculativeReduceTasks);
+        LOG.debug("Decremented count for " +
+            tip.getTIPId()+"/"+tip.getJob().getJobID() +
+          ". Current speculativeReduce task count: " + speculativeReduceTasks);
       }
     }
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jul 21 06:13:50 2009
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -77,7 +78,7 @@
   private double progress = 0;
   private double oldProgressRate;
   private String state = "";
-  private long dispatchTime = 0;   // most recent time task attempt given to TT
+  private long lastDispatchTime = 0;   // most recent time task attempt given to TT
   private long execStartTime = 0;  // when we started first task-attempt
   private long execFinishTime = 0;
   private int completes = 0;
@@ -128,6 +129,9 @@
   
   private Counters counters = new Counters();
   
+  private HashMap<TaskAttemptID, Long> dispatchTimeMap = 
+    new HashMap<TaskAttemptID, Long>();
+  
 
   /**
    * Constructor for MapTask
@@ -237,15 +241,24 @@
   /**
    * Return the dispatch time
    */
-  public long getDispatchTime(){
-    return this.dispatchTime;
+  public long getDispatchTime(TaskAttemptID taskid){
+    Long l = dispatchTimeMap.get(taskid);
+    if (l != null) {
+      return l.longValue();
+    }
+    return 0;
+  }
+
+  public long getLastDispatchTime(){
+    return this.lastDispatchTime;
   }
   
   /**
    * Set the dispatch time
    */
-  public void setDispatchTime(long disTime){
-    this.dispatchTime = disTime;
+  public void setDispatchTime(TaskAttemptID taskid, long disTime){
+    dispatchTimeMap.put(taskid, disTime);
+    this.lastDispatchTime = disTime;
   }
   
   /**
@@ -581,8 +594,9 @@
     // but finishTime has to be updated.
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
-      if ((isMapTask() && job.hasSpeculativeMaps()) || 
-          (!isMapTask() && job.hasSpeculativeReduces())) {
+      //we don't want to include setup tasks in the task execution stats
+      if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
+          (!isMapTask() && job.hasSpeculativeReduces()))) {
         long now = JobTracker.getClock().getTime();
         double oldProgRate = getOldProgressRate();
         double currProgRate = getCurrentProgressRate(now);
@@ -906,7 +920,7 @@
     }
     return (!skipping && isRunnable() && isRunning() &&
         activeTasks.size() <= MAX_TASK_EXECS &&
-        currentTime - dispatchTime >= SPECULATIVE_LAG &&
+        currentTime - lastDispatchTime >= SPECULATIVE_LAG &&
         completes == 0 && !isOnlyCommitPending() &&
         (taskStats.mean() - getCurrentProgressRate(currentTime) >
               taskStats.std() * job.getSlowTaskThreshold()));
@@ -939,13 +953,13 @@
 
     //keep track of the last time we started an attempt at this TIP
     //used to calculate the progress rate of this TIP
-    setDispatchTime(JobTracker.getClock().getTime());
+    setDispatchTime(taskid, JobTracker.getClock().getTime());
  
     //set this the first time we run a taskAttempt in this TIP
     //each Task attempt has its own TaskStatus, which tracks that
     //attempts execStartTime, thus this startTime is TIP wide.
     if (0 == execStartTime){
-      setExecStartTime(dispatchTime);
+      setExecStartTime(lastDispatchTime);
     }
     return addRunningTask(taskid, taskTracker);
   }
@@ -1139,12 +1153,14 @@
   public double getCurrentProgressRate(long currentTime) {
     double bestProgressRate = 0;
     for (TaskStatus ts : taskStatuses.values()){
-      double progressRate = ts.getProgress()/Math.max(1,
-          currentTime - dispatchTime);
-      if ((ts.getRunState() == TaskStatus.State.RUNNING  || 
-          ts.getRunState() == TaskStatus.State.SUCCEEDED) &&
-          progressRate > bestProgressRate){
-        bestProgressRate = progressRate;
+      if (ts.getRunState() == TaskStatus.State.RUNNING  || 
+          ts.getRunState() == TaskStatus.State.SUCCEEDED ||
+          ts.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+        double progressRate = ts.getProgress()/Math.max(1,
+            currentTime - getDispatchTime(ts.getTaskID()));
+        if (progressRate > bestProgressRate){
+          bestProgressRate = progressRate;
+        }
       }
     }
     return bestProgressRate;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Tue Jul 21 06:13:50 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.TaskType;
 
 /** 
  * Utilities used in unit test.
@@ -94,7 +95,7 @@
     
     private TaskAttemptID findTask(String trackerName, String trackerHost,
         Collection<TaskInProgress> nonRunningTasks, 
-        Collection<TaskInProgress> runningTasks)
+        Collection<TaskInProgress> runningTasks, TaskType taskType)
     throws IOException {
       TaskInProgress tip = null;
       Iterator<TaskInProgress> iter = nonRunningTasks.iterator();
@@ -110,7 +111,8 @@
       }
       if (tip == null) {
         if (getJobConf().getSpeculativeExecution()) {
-          tip = findSpeculativeTask(runningTasks, trackerName, trackerHost);
+          tip = findSpeculativeTask(runningTasks, trackerName, trackerHost,
+              taskType);
         }
       }
       if (tip != null) {
@@ -131,14 +133,14 @@
     throws IOException {
       return findTask(trackerName, 
           JobInProgress.convertTrackerNameToHostName(trackerName),
-          nonLocalMaps, nonLocalRunningMaps);
+          nonLocalMaps, nonLocalRunningMaps, TaskType.MAP);
     }
 
     public TaskAttemptID findReduceTask(String trackerName) 
     throws IOException {
       return findTask(trackerName, 
           JobInProgress.convertTrackerNameToHostName(trackerName),
-          nonRunningReduces, runningReduces);
+          nonRunningReduces, runningReduces, TaskType.REDUCE);
     }
 
     public void finishTask(TaskAttemptID taskId) {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Tue Jul 21 06:13:50 2009
@@ -34,9 +34,8 @@
   static FakeJobTracker jobTracker;
   static class SpecFakeClock extends FakeClock {
     long SPECULATIVE_LAG = TaskInProgress.SPECULATIVE_LAG;
-    @Override
-    public void advance(long millis) {
-      time += millis + SPECULATIVE_LAG;
+    public void advanceBySpeculativeLag() {
+      time += SPECULATIVE_LAG;
     }
   };
   static SpecFakeClock clock;
@@ -124,6 +123,7 @@
     clock.advance(1000);
     job.finishTask(taskAttemptID[1]);
     clock.advance(20000);
+    clock.advanceBySpeculativeLag();
     //we should get a speculative task now
     taskAttemptID[5] = job.findReduceTask(trackers[4]);
     assertEquals(taskAttemptID[5].getTaskID().getId(),2);
@@ -157,12 +157,12 @@
     job.finishTask(taskAttemptID[0]);
     job.finishTask(taskAttemptID[1]);
     job.finishTask(taskAttemptID[2]);
-    clock.advance(28000);
+    clock.advance(250000);
     taskAttemptID[4] = job.findMapTask(trackers[3]);
-    clock.advance(5000);
+    clock.advanceBySpeculativeLag();
     //by doing the above clock adjustments, we bring the progress rate of 
-    //taskID 3 lower than 4. For taskID 3, the rate is 85/35000
-    //and for taskID 4, the rate is 20/5000. But when we ask for a spec task
+    //taskID 3 lower than 4. For taskID 3, the rate is 85/317000
+    //and for taskID 4, the rate is 20/65000. But when we ask for a spec task
     //now, we should get back taskID 4 (since that is expected to complete
     //later than taskID 3).
     job.progressMade(taskAttemptID[3], 0.85f);
@@ -186,7 +186,7 @@
     //Tests the fact that the max tasks launched is 10
     assertEquals(speculativeCap(1200,1150,20), 10);
     //Tests the fact that the max tasks launched is 0.01 * #slots
-    assertEquals(speculativeCap(1200,1150,2000), 20);
+    assertEquals(speculativeCap(1200,1150,4000), 20);
   }
   
   private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
@@ -207,6 +207,9 @@
     for (i = 0; i < numEarlyComplete; i++) {
       job.finishTask(taskAttemptID[i]);
     }
+
+    clock.advanceBySpeculativeLag();
+
     for (i = numEarlyComplete; i < totalTasks; i++) {
       job.progressMade(taskAttemptID[i], 0.85f);
     }