You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/09/06 00:54:39 UTC

svn commit: r692572 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Author: cdouglas
Date: Fri Sep  5 15:54:39 2008
New Revision: 692572

URL: http://svn.apache.org/viewvc?rev=692572&view=rev
Log:
HADOOP-3940. Fix in-memory merge condition to wait when there are no map
outputs or when the final map outputs are being fetched without contention.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=692572&r1=692571&r2=692572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Fri Sep  5 15:54:39 2008
@@ -17,6 +17,10 @@
     HADOOP-4046. Made WritableComparable's constructor protected instead of 
     private to re-enable class derivation. (cdouglas via omalley)
 
+    HADOOP-3940. Fix in-memory merge condition to wait when there are no map
+    outputs or when the final map outputs are being fetched without contention.
+    (cdouglas)
+
 Release 0.18.0 - 2008-08-19
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=692572&r1=692571&r2=692572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Sep  5 15:54:39 2008
@@ -790,17 +790,31 @@
       public boolean waitForDataToMerge() throws InterruptedException {
         boolean done = false;
         synchronized (dataAvailable) {
-          while (!closed &&
+                 // Start in-memory merge if manager has been closed or...
+          while (!closed
+                 &&
+                 // In-memory threshold exceeded and at least two segments
+                 // have been fetched
                  (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
                   numClosed < 
                     (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
                  ) 
                  &&
+                 // More than "mapred.inmem.merge.threshold" map outputs
+                 // have been fetched into memory
                  (mergeThreshold <= 0 || numClosed < mergeThreshold) 
                  && 
+                 // More than MAX... threads are blocked on the RamManager
+                 // or the blocked threads are the last map outputs to be
+                 // fetched. If numRequiredMapOutputs is zero, either
+                 // setNumCopiedMapOutputs has not been called (no map ouputs
+                 // have been fetched, so there is nothing to merge) or the
+                 // last map outputs being transferred without
+                 // contention, so a merge would be premature.
                  (numPendingRequests < 
                       numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && 
-                   numPendingRequests < numRequiredMapOutputs)) {
+                  (0 == numRequiredMapOutputs ||
+                   numPendingRequests < numRequiredMapOutputs))) {
             dataAvailable.wait();
           }
           done = closed;