You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/01/03 13:15:55 UTC

svn commit: r608462 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java

Author: ddas
Date: Thu Jan  3 04:15:53 2008
New Revision: 608462

URL: http://svn.apache.org/viewvc?rev=608462&view=rev
Log:
HADOOP-1719. Improves the utilization of shuffle copier threads. Contributed by Amar Kamat.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=608462&r1=608461&r2=608462&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan  3 04:15:53 2008
@@ -144,6 +144,9 @@
     performance. Make NullWritable implement Comparable. Make TextOutputFormat
     treat NullWritable like null. (omalley)
 
+    HADOOP-1719. Improves the utilization of shuffle copier threads.
+    (Amar Kamat via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=608462&r1=608461&r2=608462&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jan  3 04:15:53 2008
@@ -959,21 +959,30 @@
             // MapOutputLocations as values
             knownOutputs.addAll(retryFetches);
              
-            // The call getMapCompletionEvents will update fromEventId to
-            // used for the next call to getMapCompletionEvents
-            int currentNumKnownMaps = knownOutputs.size();
-            int currentNumObsoleteMapIds = obsoleteMapIds.size();
-            getMapCompletionEvents(fromEventId, knownOutputs);
+            // ensure we have enough to keep us busy
+            boolean busy = isBusy(numInFlight, numCopiers, lowThreshold, 
+                                  uniqueHosts.size(), probe_sample_size, 
+                                  numOutputs - numCopied);
+            if (!busy) {
+              // The call getMapCompletionEvents will update fromEventId to
+              // used for the next call to getMapCompletionEvents
+              int currentNumKnownMaps = knownOutputs.size();
+              int currentNumObsoleteMapIds = obsoleteMapIds.size();
+              getMapCompletionEvents(fromEventId, knownOutputs);
 
             
-            LOG.info(reduceTask.getTaskId() + ": " +  
+              LOG.info(reduceTask.getTaskId() + ": " +  
                      "Got " + (knownOutputs.size()-currentNumKnownMaps) + 
                      " new map-outputs & " + 
                      (obsoleteMapIds.size()-currentNumObsoleteMapIds) + 
                      " obsolete map-outputs from tasktracker and " + 
                      retryFetches.size() + " map-outputs from previous failures"
                      );
-            
+            } else {
+              LOG.info(" Busy enough - did not query the tasktracker for " 
+                       + "new map outputs. Have "+ retryFetches.size() 
+                       + " map outputs from previous failures");
+            }
             // clear the "failed" fetches hashmap
             retryFetches.clear();
           }
@@ -1181,16 +1190,10 @@
               numInFlight--;
             }
             
-            boolean busy = true;
-            // ensure we have enough to keep us busy
-            if (numInFlight < lowThreshold && (numOutputs-numCopied) > 
-                probe_sample_size) {
-              busy = false;
-            }
             //Check whether we have more CopyResult to check. If there is none,
-            //and we are not busy enough, break
+            //break
             synchronized (copyResults) {
-              if (copyResults.size() == 0 && !busy) {
+              if (copyResults.size() == 0) {
                 break;
               }
             }
@@ -1276,12 +1279,30 @@
       }
     }
     
+    /** Added a check for whether #uniqueHosts < #copiers, and if so conclude
+    * we are not busy enough. The logic is that we fetch only one map output
+    * at a time from any given host and uniqueHosts keep a track of that. 
+    * As soon as we add a host to uniqueHosts, a 'copy' from that is 
+    * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers,
+    * it means that all copiers are busy. Although the converse is not true
+    * (e.g. in the case where we have more copiers than the number of hosts
+    * in the cluster), but it should generally be useful to do this check. 
+    **/ 
+    private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold,
+                           int uniqueHostsSize, int probeSampleSize,  
+                           int remainCopy) {
+      if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) || 
+          uniqueHostsSize < numCopiers) {
+        return false;
+      }
+      return true;
+    }
     
     private CopyResult getCopyResult() {  
       synchronized (copyResults) {
-        while (copyResults.isEmpty()) {
+        if (copyResults.isEmpty()) {
           try {
-            copyResults.wait();
+            copyResults.wait(2000); // wait for 2 sec 
           } catch (InterruptedException e) { }
         }
         if (copyResults.isEmpty()) {