You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/12/21 19:32:57 UTC

svn commit: r1425074 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src...

Author: jlowe
Date: Fri Dec 21 18:32:57 2012
New Revision: 1425074

URL: http://svn.apache.org/viewvc?rev=1425074&view=rev
Log:
svn merge -c 1425071 FIXES: MAPREDUCE-4842. Shuffle race can hang reducer. Contributed by Mariappan Asokan

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
      - copied unchanged from r1425071, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1425074&r1=1425073&r2=1425074&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Dec 21 18:32:57 2012
@@ -473,6 +473,8 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0
     (Ravi Prakash via jeagles)
 
+    MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1425074&r1=1425073&r2=1425074&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Dec 21 18:32:57 2012
@@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 
-@SuppressWarnings(value={"unchecked", "deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
+@SuppressWarnings(value={"unchecked"})
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class MergeManager<K, V> {
@@ -85,7 +87,7 @@ public class MergeManager<K, V> {
 
   Set<MapOutput<K, V>> inMemoryMapOutputs = 
     new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
-  private final InMemoryMerger inMemoryMerger;
+  private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
   
   Set<Path> onDiskMapOutputs = new TreeSet<Path>();
   private final OnDiskMerger onDiskMerger;
@@ -179,6 +181,8 @@ public class MergeManager<K, V> {
           + singleShuffleMemoryLimitPercent);
     }
 
+    usedMemory = 0L;
+    commitMemory = 0L;
     this.maxSingleShuffleLimit = 
       (long)(memoryLimit * singleShuffleMemoryLimitPercent);
     this.memToMemMergeOutputsThreshold = 
@@ -210,7 +214,7 @@ public class MergeManager<K, V> {
       this.memToMemMerger = null;
     }
     
-    this.inMemoryMerger = new InMemoryMerger(this);
+    this.inMemoryMerger = createInMemoryMerger();
     this.inMemoryMerger.start();
     
     this.onDiskMerger = new OnDiskMerger(this);
@@ -219,11 +223,19 @@ public class MergeManager<K, V> {
     this.mergePhase = mergePhase;
   }
   
+  protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
+    return new InMemoryMerger(this);
+  }
 
   TaskAttemptID getReduceId() {
     return reduceId;
   }
 
+  @VisibleForTesting
+  ExceptionReporter getExceptionReporter() {
+    return exceptionReporter;
+  }
+
   public void waitForInMemoryMerge() throws InterruptedException {
     inMemoryMerger.waitForMerge();
   }
@@ -288,7 +300,6 @@ public class MergeManager<K, V> {
   }
   
   synchronized void unreserve(long size) {
-    commitMemory -= size;
     usedMemory -= size;
   }
 
@@ -300,24 +311,20 @@ public class MergeManager<K, V> {
 
     commitMemory+= mapOutput.getSize();
 
-    synchronized (inMemoryMerger) {
-      // Can hang if mergeThreshold is really low.
-      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
-        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-            commitMemory + " > mergeThreshold=" + mergeThreshold + 
-            ". Current usedMemory=" + usedMemory);
-        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-        inMemoryMergedMapOutputs.clear();
-        inMemoryMerger.startMerge(inMemoryMapOutputs);
-      } 
+    // Can hang if mergeThreshold is really low.
+    if (commitMemory >= mergeThreshold) {
+      LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+          commitMemory + " > mergeThreshold=" + mergeThreshold + 
+          ". Current usedMemory=" + usedMemory);
+      inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+      inMemoryMergedMapOutputs.clear();
+      inMemoryMerger.startMerge(inMemoryMapOutputs);
+      commitMemory = 0L;  // Reset commitMemory.
     }
     
     if (memToMemMerger != null) {
-      synchronized (memToMemMerger) {
-        if (!memToMemMerger.isInProgress() && 
-            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
-          memToMemMerger.startMerge(inMemoryMapOutputs);
-        }
+      if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { 
+        memToMemMerger.startMerge(inMemoryMapOutputs);
       }
     }
   }
@@ -333,11 +340,8 @@ public class MergeManager<K, V> {
   public synchronized void closeOnDiskFile(Path file) {
     onDiskMapOutputs.add(file);
     
-    synchronized (onDiskMerger) {
-      if (!onDiskMerger.isInProgress() && 
-          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
-        onDiskMerger.startMerge(onDiskMapOutputs);
-      }
+    if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+      onDiskMerger.startMerge(onDiskMapOutputs);
     }
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1425074&r1=1425073&r2=1425074&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Fri Dec 21 18:32:57 2012
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extend
   
   private static final Log LOG = LogFactory.getLog(MergeThread.class);
 
-  private volatile boolean inProgress = false;
-  private List<T> inputs = new ArrayList<T>();
+  private AtomicInteger numPending = new AtomicInteger(0);
+  private LinkedList<List<T>> pendingToBeMerged;
   protected final MergeManager<K,V> manager;
   private final ExceptionReporter reporter;
   private boolean closed = false;
@@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extend
   
   public MergeThread(MergeManager<K,V> manager, int mergeFactor,
                      ExceptionReporter reporter) {
+    this.pendingToBeMerged = new LinkedList<List<T>>();
     this.manager = manager;
     this.mergeFactor = mergeFactor;
     this.reporter = reporter;
@@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extend
     interrupt();
   }
 
-  public synchronized boolean isInProgress() {
-    return inProgress;
-  }
-  
-  public synchronized void startMerge(Set<T> inputs) {
+  public void startMerge(Set<T> inputs) {
     if (!closed) {
-      inProgress = true;
-      this.inputs = new ArrayList<T>();
+      numPending.incrementAndGet();
+      List<T> toMergeInputs = new ArrayList<T>();
       Iterator<T> iter=inputs.iterator();
       for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
-        this.inputs.add(iter.next());
+        toMergeInputs.add(iter.next());
         iter.remove();
       }
-      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
+      LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + 
                " segments, while ignoring " + inputs.size() + " segments");
-      notifyAll();
+      synchronized(pendingToBeMerged) {
+        pendingToBeMerged.addLast(toMergeInputs);
+        pendingToBeMerged.notifyAll();
+      }
     }
   }
 
   public synchronized void waitForMerge() throws InterruptedException {
-    while (inProgress) {
+    while (numPending.get() > 0) {
       wait();
     }
   }
 
   public void run() {
     while (true) {
+      List<T> inputs = null;
       try {
         // Wait for notification to start the merge...
-        synchronized (this) {
-          while (!inProgress) {
-            wait();
+        synchronized (pendingToBeMerged) {
+          while(pendingToBeMerged.size() <= 0) {
+            pendingToBeMerged.wait();
           }
+          // Pickup the inputs to merge.
+          inputs = pendingToBeMerged.removeFirst();
         }
 
         // Merge
         merge(inputs);
       } catch (InterruptedException ie) {
+        numPending.set(0);
         return;
       } catch(Throwable t) {
+        numPending.set(0);
         reporter.reportException(t);
         return;
       } finally {
         synchronized (this) {
-          // Clear inputs
-          inputs = null;
-          inProgress = false;        
+          numPending.decrementAndGet();
           notifyAll();
         }
       }