You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/06 08:26:58 UTC

svn commit: r663837 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java

Author: ddas
Date: Thu Jun  5 23:26:58 2008
New Revision: 663837

URL: http://svn.apache.org/viewvc?rev=663837&view=rev
Log:
HADOOP-3427. Improves the shuffle scheduler. It now waits for notifications from shuffle threads when it has scheduled enough, before scheduling more. Contibuted by Devaraj Das.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663837&r1=663836&r2=663837&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun  5 23:26:58 2008
@@ -238,6 +238,9 @@
     HADOOP-236. JobTacker now refuses connection from a task tracker with a 
     different version number. (Sharad Agarwal via ddas)
 
+    HADOOP-3427. Improves the shuffle scheduler. It now waits for notifications
+    from shuffle threads when it has scheduled enough, before scheduling more.
+    (ddas)
 
   OPTIMIZATIONS
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=663837&r1=663836&r2=663837&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun  5 23:26:58 2008
@@ -369,6 +369,12 @@
     private int numCopiers;
     
     /**
+     *  a number that is set to the max #fetches we'd schedule and then
+     *  pause the schduling
+     */
+    private int maxInFlight;
+    
+    /**
      * the amount of time spent on fetching one map output before considering 
      * it as failed and notifying the jobtracker about it.
      */
@@ -1114,6 +1120,7 @@
       this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
+      this.maxInFlight = 4 * numCopiers;
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       this.combinerClass = conf.getCombinerClass();
       combineCollector = (null != combinerClass)
@@ -1160,6 +1167,10 @@
       this.maxMapRuntime = 0;
     }
     
+    private boolean busyEnough(int numInFlight) {
+      return numInFlight > maxInFlight;
+    }
+    
     @SuppressWarnings("unchecked")
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
@@ -1335,151 +1346,151 @@
           while (numInFlight > 0 && mergeThrowable == null) {
             LOG.debug(reduceTask.getTaskID() + " numInFlight = " + 
                       numInFlight);
-            CopyResult cr = getCopyResult();
+            //the call to getCopyResult will either 
+            //1) return immediately with a null or a valid CopyResult object,
+            //                 or
+            //2) if the numInFlight is above maxInFlight, return with a 
+            //   CopyResult object after getting a notification from a 
+            //   fetcher thread, 
+            //So, when getCopyResult returns null, we can be sure that
+            //we aren't busy enough and we should go and get more mapcompletion
+            //events from the tasktracker
+            CopyResult cr = getCopyResult(numInFlight);
+
+            if (cr == null) {
+              break;
+            }
             
-            if (cr != null) {
-              if (cr.getSuccess()) {  // a successful copy
-                numCopied++;
-                lastProgressTime = System.currentTimeMillis();
-                bytesTransferred += cr.getSize();
+            if (cr.getSuccess()) {  // a successful copy
+              numCopied++;
+              lastProgressTime = System.currentTimeMillis();
+              bytesTransferred += cr.getSize();
                 
-                long secsSinceStart = 
-                  (System.currentTimeMillis()-startTime)/1000+1;
-                float mbs = ((float)bytesTransferred)/(1024*1024);
-                float transferRate = mbs/secsSinceStart;
+              long secsSinceStart = 
+                (System.currentTimeMillis()-startTime)/1000+1;
+              float mbs = ((float)bytesTransferred)/(1024*1024);
+              float transferRate = mbs/secsSinceStart;
                 
-                copyPhase.startNextPhase();
-                copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
-                                    + " at " +
-                                    mbpsFormat.format(transferRate) +  " MB/s)");
+              copyPhase.startNextPhase();
+              copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
+                                  + " at " +
+                                  mbpsFormat.format(transferRate) +  " MB/s)");
                 
-                // Note successfull fetch for this mapId to invalidate
-                // (possibly) old fetch-failures
-                fetchFailedMaps.remove(cr.getLocation().getTaskId());
-              } else if (cr.isObsolete()) {
-                //ignore
-                LOG.info(reduceTask.getTaskID() + 
-                         " Ignoring obsolete copy result for Map Task: " + 
-                         cr.getLocation().getTaskAttemptId() + " from host: " + 
-                         cr.getHost());
-              } else {
-                retryFetches.add(cr.getLocation());
-                
-                // note the failed-fetch
-                TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
-                TaskID mapId = cr.getLocation().getTaskId();
-                
-                totalFailures++;
-                Integer noFailedFetches = 
-                  mapTaskToFailedFetchesMap.get(mapTaskId);
-                noFailedFetches = 
-                  (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
-                mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
-                LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
-                         noFailedFetches + " from " + mapTaskId);
-                
-                // did the fetch fail too many times?
-                // using a hybrid technique for notifying the jobtracker.
-                //   a. the first notification is sent after max-retries 
-                //   b. subsequent notifications are sent after 2 retries.   
-                if ((noFailedFetches >= maxFetchRetriesPerMap) 
-                    && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
-                  synchronized (ReduceTask.this) {
-                    taskStatus.addFetchFailedMap(mapTaskId);
-                    LOG.info("Failed to fetch map-output from " + mapTaskId + 
-                             " even after MAX_FETCH_RETRIES_PER_MAP retries... "
-                             + " reporting to the JobTracker");
-                  }
+              // Note successfull fetch for this mapId to invalidate
+              // (possibly) old fetch-failures
+              fetchFailedMaps.remove(cr.getLocation().getTaskId());
+            } else if (cr.isObsolete()) {
+              //ignore
+              LOG.info(reduceTask.getTaskID() + 
+                       " Ignoring obsolete copy result for Map Task: " + 
+                       cr.getLocation().getTaskAttemptId() + " from host: " + 
+                       cr.getHost());
+            } else {
+              retryFetches.add(cr.getLocation());
+              
+              // note the failed-fetch
+              TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
+              TaskID mapId = cr.getLocation().getTaskId();
+              
+              totalFailures++;
+              Integer noFailedFetches = 
+                mapTaskToFailedFetchesMap.get(mapTaskId);
+              noFailedFetches = 
+                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
+              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
+              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
+                       noFailedFetches + " from " + mapTaskId);
+              
+              // did the fetch fail too many times?
+              // using a hybrid technique for notifying the jobtracker.
+              //   a. the first notification is sent after max-retries 
+              //   b. subsequent notifications are sent after 2 retries.   
+              if ((noFailedFetches >= maxFetchRetriesPerMap) 
+                  && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+                synchronized (ReduceTask.this) {
+                  taskStatus.addFetchFailedMap(mapTaskId);
+                  LOG.info("Failed to fetch map-output from " + mapTaskId + 
+                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+                           + " reporting to the JobTracker");
                 }
-
-                // note unique failed-fetch maps
-                if (noFailedFetches == maxFetchRetriesPerMap) {
-                  fetchFailedMaps.add(mapId);
-                  
-                  // did we have too many unique failed-fetch maps?
-                  // and did we fail on too many fetch attempts?
-                  // and did we progress enough
-                  //     or did we wait for too long without any progress?
-                  
-                  // check if the reducer is healthy
-                  boolean reducerHealthy = 
-                      (((float)totalFailures / (totalFailures + numCopied)) 
-                       < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-                  
-                  // check if the reducer has progressed enough
-                  boolean reducerProgressedEnough = 
-                      (((float)numCopied / numMaps) 
-                       >= MIN_REQUIRED_PROGRESS_PERCENT);
-                  
-                  // check if the reducer is stalled for a long time
-                  
-                  // duration for which the reducer is stalled
-                  int stallDuration = 
-                      (int)(System.currentTimeMillis() - lastProgressTime);
-                  // duration for which the reducer ran with progress
-                  int shuffleProgressDuration = 
-                      (int)(lastProgressTime - startTime);
-                  // min time the reducer should run without getting killed
-                  int minShuffleRunDuration = 
-                      (shuffleProgressDuration > maxMapRuntime) 
-                      ? shuffleProgressDuration 
-                      : maxMapRuntime;
-                  boolean reducerStalled = 
-                      (((float)stallDuration / minShuffleRunDuration) 
-                       >= MAX_ALLOWED_STALL_TIME_PERCENT);
+              }
+              // note unique failed-fetch maps
+              if (noFailedFetches == maxFetchRetriesPerMap) {
+                fetchFailedMaps.add(mapId);
                   
-                  // kill if not healthy and has insufficient progress
-                  if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
-                      && !reducerHealthy 
-                      && (!reducerProgressedEnough || reducerStalled)) { 
-                    LOG.fatal("Shuffle failed with too many fetch failures " + 
-                              "and insufficient progress!" +
-                              "Killing task " + getTaskID() + ".");
-                    umbilical.shuffleError(getTaskID(), 
-                                           "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
-                                           + " bailing-out.");
-                  }
-                }
+                // did we have too many unique failed-fetch maps?
+                // and did we fail on too many fetch attempts?
+                // and did we progress enough
+                //     or did we wait for too long without any progress?
+               
+                // check if the reducer is healthy
+                boolean reducerHealthy = 
+                    (((float)totalFailures / (totalFailures + numCopied)) 
+                     < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
                 
-                // back off exponentially until num_retries <= max_retries
-                // back off by max_backoff/2 on subsequent failed attempts
-                currentTime = System.currentTimeMillis();
-                int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
-                                     ? BACKOFF_INIT 
-                                       * (1 << (noFailedFetches - 1)) 
-                                     : (this.maxBackoff * 1000 / 2);
-                penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
-                LOG.warn(reduceTask.getTaskID() + " adding host " +
-                         cr.getHost() + " to penalty box, next contact in " +
-                         (currentBackOff/1000) + " seconds");
+                // check if the reducer has progressed enough
+                boolean reducerProgressedEnough = 
+                    (((float)numCopied / numMaps) 
+                     >= MIN_REQUIRED_PROGRESS_PERCENT);
                 
-                // other outputs from the failed host may be present in the
-                // knownOutputs cache, purge them. This is important in case
-                // the failure is due to a lost tasktracker (causes many
-                // unnecessary backoffs). If not, we only take a small hit
-                // polling the tasktracker a few more times
-                Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
-                while (locIt.hasNext()) {
-                  MapOutputLocation loc = locIt.next();
-                  if (cr.getHost().equals(loc.getHost())) {
-                    retryFetches.add(loc);
-                    locIt.remove();
-                  }
+                // check if the reducer is stalled for a long time
+                // duration for which the reducer is stalled
+                int stallDuration = 
+                    (int)(System.currentTimeMillis() - lastProgressTime);
+                // duration for which the reducer ran with progress
+                int shuffleProgressDuration = 
+                    (int)(lastProgressTime - startTime);
+                // min time the reducer should run without getting killed
+                int minShuffleRunDuration = 
+                    (shuffleProgressDuration > maxMapRuntime) 
+                    ? shuffleProgressDuration 
+                    : maxMapRuntime;
+                boolean reducerStalled = 
+                    (((float)stallDuration / minShuffleRunDuration) 
+                     >= MAX_ALLOWED_STALL_TIME_PERCENT);
+                
+                // kill if not healthy and has insufficient progress
+                if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
+                    && !reducerHealthy 
+                    && (!reducerProgressedEnough || reducerStalled)) { 
+                  LOG.fatal("Shuffle failed with too many fetch failures " + 
+                            "and insufficient progress!" +
+                            "Killing task " + getTaskID() + ".");
+                  umbilical.shuffleError(getTaskID(), 
+                                         "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+                                         + " bailing-out.");
                 }
               }
-              uniqueHosts.remove(cr.getHost());
-              numInFlight--;
-            }
-            
-            //Check whether we have more CopyResult to check. If there is none,
-            //break
-            synchronized (copyResults) {
-              if (copyResults.size() == 0) {
-                break;
+                
+              // back off exponentially until num_retries <= max_retries
+              // back off by max_backoff/2 on subsequent failed attempts
+              currentTime = System.currentTimeMillis();
+              int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
+                                   ? BACKOFF_INIT 
+                                     * (1 << (noFailedFetches - 1)) 
+                                   : (this.maxBackoff * 1000 / 2);
+              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
+              LOG.warn(reduceTask.getTaskID() + " adding host " +
+                       cr.getHost() + " to penalty box, next contact in " +
+                       (currentBackOff/1000) + " seconds");
+              
+              // other outputs from the failed host may be present in the
+              // knownOutputs cache, purge them. This is important in case
+              // the failure is due to a lost tasktracker (causes many
+              // unnecessary backoffs). If not, we only take a small hit
+              // polling the tasktracker a few more times
+              Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
+              while (locIt.hasNext()) {
+                MapOutputLocation loc = locIt.next();
+                if (cr.getHost().equals(loc.getHost())) {
+                  retryFetches.add(loc);
+                  locIt.remove();
+                }
               }
             }
+            uniqueHosts.remove(cr.getHost());
+            numInFlight--;
           }
-          
         }
         
         // all done, inform the copiers to exit
@@ -1542,18 +1553,20 @@
       return inMemorySegments;
     }
     
-    private CopyResult getCopyResult() {  
+    private CopyResult getCopyResult(int numInFlight) {  
       synchronized (copyResults) {
-        if (copyResults.isEmpty()) {
+        while (copyResults.isEmpty()) {
           try {
-            copyResults.wait(2000); // wait for 2 sec 
+            //The idea is that if we have scheduled enough, we can wait until
+            //we hear from one of the copiers.
+            if (busyEnough(numInFlight)) {
+              copyResults.wait();
+            } else {
+              return null;
+            }
           } catch (InterruptedException e) { }
         }
-        if (copyResults.isEmpty()) {
-          return null;
-        } else {
-          return copyResults.remove(0);
-        }
+        return copyResults.remove(0);
       }    
     }