You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/12/21 19:35:25 UTC
svn commit: r1425075 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/...
Author: jlowe
Date: Fri Dec 21 18:35:24 2012
New Revision: 1425075
URL: http://svn.apache.org/viewvc?rev=1425075&view=rev
Log:
svn merge -c 1425071 FIXES: MAPREDUCE-4842. Shuffle race can hang reducer. Contributed by Mariappan Asokan
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
- copied unchanged from r1425071, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1425075&r1=1425074&r2=1425075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Dec 21 18:35:24 2012
@@ -38,6 +38,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4856. TestJobOutputCommitter uses same directory as TestJobCleanup
(Sandy Ryza via tgraves)
+ MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1425075&r1=1425074&r2=1425075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Dec 21 18:35:24 2012
@@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
-@SuppressWarnings(value={"unchecked", "deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
+@SuppressWarnings(value={"unchecked"})
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MergeManager<K, V> {
@@ -85,7 +87,7 @@ public class MergeManager<K, V> {
Set<MapOutput<K, V>> inMemoryMapOutputs =
new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
- private final InMemoryMerger inMemoryMerger;
+ private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
Set<Path> onDiskMapOutputs = new TreeSet<Path>();
private final OnDiskMerger onDiskMerger;
@@ -179,6 +181,8 @@ public class MergeManager<K, V> {
+ singleShuffleMemoryLimitPercent);
}
+ usedMemory = 0L;
+ commitMemory = 0L;
this.maxSingleShuffleLimit =
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold =
@@ -210,7 +214,7 @@ public class MergeManager<K, V> {
this.memToMemMerger = null;
}
- this.inMemoryMerger = new InMemoryMerger(this);
+ this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start();
this.onDiskMerger = new OnDiskMerger(this);
@@ -219,11 +223,19 @@ public class MergeManager<K, V> {
this.mergePhase = mergePhase;
}
+ protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
+ return new InMemoryMerger(this);
+ }
TaskAttemptID getReduceId() {
return reduceId;
}
+ @VisibleForTesting
+ ExceptionReporter getExceptionReporter() {
+ return exceptionReporter;
+ }
+
public void waitForInMemoryMerge() throws InterruptedException {
inMemoryMerger.waitForMerge();
}
@@ -288,7 +300,6 @@ public class MergeManager<K, V> {
}
synchronized void unreserve(long size) {
- commitMemory -= size;
usedMemory -= size;
}
@@ -300,24 +311,20 @@ public class MergeManager<K, V> {
commitMemory+= mapOutput.getSize();
- synchronized (inMemoryMerger) {
- // Can hang if mergeThreshold is really low.
- if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
- LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
- commitMemory + " > mergeThreshold=" + mergeThreshold +
- ". Current usedMemory=" + usedMemory);
- inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
- inMemoryMergedMapOutputs.clear();
- inMemoryMerger.startMerge(inMemoryMapOutputs);
- }
+ // Can hang if mergeThreshold is really low.
+ if (commitMemory >= mergeThreshold) {
+ LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+ commitMemory + " > mergeThreshold=" + mergeThreshold +
+ ". Current usedMemory=" + usedMemory);
+ inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+ inMemoryMergedMapOutputs.clear();
+ inMemoryMerger.startMerge(inMemoryMapOutputs);
+ commitMemory = 0L; // Reset commitMemory.
}
if (memToMemMerger != null) {
- synchronized (memToMemMerger) {
- if (!memToMemMerger.isInProgress() &&
- inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
- memToMemMerger.startMerge(inMemoryMapOutputs);
- }
+ if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
+ memToMemMerger.startMerge(inMemoryMapOutputs);
}
}
}
@@ -333,11 +340,8 @@ public class MergeManager<K, V> {
public synchronized void closeOnDiskFile(Path file) {
onDiskMapOutputs.add(file);
- synchronized (onDiskMerger) {
- if (!onDiskMerger.isInProgress() &&
- onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
- onDiskMerger.startMerge(onDiskMapOutputs);
- }
+ if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+ onDiskMerger.startMerge(onDiskMapOutputs);
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1425075&r1=1425074&r2=1425075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Fri Dec 21 18:35:24 2012
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extend
private static final Log LOG = LogFactory.getLog(MergeThread.class);
- private volatile boolean inProgress = false;
- private List<T> inputs = new ArrayList<T>();
+ private AtomicInteger numPending = new AtomicInteger(0);
+ private LinkedList<List<T>> pendingToBeMerged;
protected final MergeManager<K,V> manager;
private final ExceptionReporter reporter;
private boolean closed = false;
@@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extend
public MergeThread(MergeManager<K,V> manager, int mergeFactor,
ExceptionReporter reporter) {
+ this.pendingToBeMerged = new LinkedList<List<T>>();
this.manager = manager;
this.mergeFactor = mergeFactor;
this.reporter = reporter;
@@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extend
interrupt();
}
- public synchronized boolean isInProgress() {
- return inProgress;
- }
-
- public synchronized void startMerge(Set<T> inputs) {
+ public void startMerge(Set<T> inputs) {
if (!closed) {
- inProgress = true;
- this.inputs = new ArrayList<T>();
+ numPending.incrementAndGet();
+ List<T> toMergeInputs = new ArrayList<T>();
Iterator<T> iter=inputs.iterator();
for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
- this.inputs.add(iter.next());
+ toMergeInputs.add(iter.next());
iter.remove();
}
- LOG.info(getName() + ": Starting merge with " + this.inputs.size() +
+ LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() +
" segments, while ignoring " + inputs.size() + " segments");
- notifyAll();
+ synchronized(pendingToBeMerged) {
+ pendingToBeMerged.addLast(toMergeInputs);
+ pendingToBeMerged.notifyAll();
+ }
}
}
public synchronized void waitForMerge() throws InterruptedException {
- while (inProgress) {
+ while (numPending.get() > 0) {
wait();
}
}
public void run() {
while (true) {
+ List<T> inputs = null;
try {
// Wait for notification to start the merge...
- synchronized (this) {
- while (!inProgress) {
- wait();
+ synchronized (pendingToBeMerged) {
+ while(pendingToBeMerged.size() <= 0) {
+ pendingToBeMerged.wait();
}
+ // Pickup the inputs to merge.
+ inputs = pendingToBeMerged.removeFirst();
}
// Merge
merge(inputs);
} catch (InterruptedException ie) {
+ numPending.set(0);
return;
} catch(Throwable t) {
+ numPending.set(0);
reporter.reportException(t);
return;
} finally {
synchronized (this) {
- // Clear inputs
- inputs = null;
- inProgress = false;
+ numPending.decrementAndGet();
notifyAll();
}
}