You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/01 23:15:49 UTC
svn commit: r543619 - in /lucene/hadoop/branches/branch-0.13: CHANGES.txt
src/java/org/apache/hadoop/mapred/MapTask.java
src/java/org/apache/hadoop/mapred/ReduceTask.java
Author: cutting
Date: Fri Jun 1 14:15:48 2007
New Revision: 543619
URL: http://svn.apache.org/viewvc?view=rev&rev=543619
Log:
Merge -r 543606:543607 from trunk to 0.13 branch, with manual changes to resolve conflicts. Fixes: HADOOP-1431.
Modified:
lucene/hadoop/branches/branch-0.13/CHANGES.txt
lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=543619&r1=543618&r2=543619
==============================================================================
--- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Fri Jun 1 14:15:48 2007
@@ -436,6 +436,10 @@
130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit
tests on Windows. (omalley via cutting)
+131. HADOOP-1431. Fix so that sort progress reporting during map runs
+ only while sorting, so that stuck maps are correctly terminated.
+ (Devaraj Das and Arun C Murthy via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=543619&r1=543618&r2=543619
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 1 14:15:48 2007
@@ -163,6 +163,7 @@
throws IOException {
setProgress(getProgress());
+ reportProgress(umbilical);
long beforePos = getPos();
boolean ret = rawIn.next(key, value);
if (ret) {
@@ -178,22 +179,16 @@
}
};
- Thread sortProgress = createProgressThread(umbilical);
MapRunnable runner =
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
- sortProgress.start();
runner.run(in, collector, reporter);
collector.flush();
} finally {
//close
in.close(); // close input
collector.close();
- sortProgress.interrupt();
- try {
- sortProgress.join();
- } catch (InterruptedException ie){ }
}
done(umbilical);
}
@@ -220,6 +215,7 @@
};
sortProgress.setName("Sort progress reporter for task "+getTaskId());
sortProgress.setDaemon(true);
+ sortProgress.start();
return sortProgress;
}
@@ -381,10 +377,20 @@
for (int i = 0; i < partitions; i++)
totalMem += sortImpl[i].getMemoryUtilized();
if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
- sortAndSpillToDisk();
- keyValBuffer.reset();
- for (int i = 0; i < partitions; i++)
- sortImpl[i].close();
+
+ // Start the progress thread
+ Thread progress = createProgressThread(umbilical);
+
+ try {
+ sortAndSpillToDisk();
+ keyValBuffer.reset();
+ for (int i = 0; i < partitions; i++) {
+ sortImpl[i].close();
+ }
+ } finally {
+ // Stop the progress thread
+ progress.interrupt();
+ }
}
}
}
@@ -602,13 +608,22 @@
}
public void flush() throws IOException {
- //check whether the length of the key/value buffer is 0. If not, then
- //we need to spill that to disk. Note that we reset the key/val buffer
- //upon each spill (so a length > 0 means that we have not spilled yet)
- if (keyValBuffer.getLength() > 0) {
- sortAndSpillToDisk();
+
+ // Start the progress thread
+ Thread progress = createProgressThread(umbilical);
+
+ try {
+ //check whether the length of the key/value buffer is 0. If not, then
+ //we need to spill that to disk. Note that we reset the key/val buffer
+ //upon each spill (so a length > 0 means that we have not spilled yet)
+ if (keyValBuffer.getLength() > 0) {
+ sortAndSpillToDisk();
+ }
+ mergeParts();
+ } finally {
+ // Stop the progress thread
+ progress.interrupt();
}
- mergeParts();
}
}
}
Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=543619&r1=543618&r2=543619
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 1 14:15:48 2007
@@ -37,6 +37,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,7 +79,7 @@
private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
private int numMaps;
- private boolean sortComplete;
+ AtomicBoolean sortComplete = new AtomicBoolean(false);
private ReduceCopier reduceCopier;
{
@@ -283,7 +284,7 @@
// spawn a thread to give sort progress heartbeats
Thread sortProgress = new Thread() {
public void run() {
- while (!sortComplete) {
+ while (!sortComplete.get()) {
try {
reportProgress(umbilical);
Thread.sleep(PROGRESS_INTERVAL);
@@ -298,6 +299,7 @@
}
}
};
+ sortProgress.setDaemon(true);
sortProgress.setName("Sort progress reporter for task "+getTaskId());
Path tempDir = new Path(getTaskId());
@@ -317,7 +319,7 @@
!conf.getKeepFailedTaskFiles()); // sort
} finally {
- sortComplete = true;
+ sortComplete.set(true);
}
sortPhase.complete(); // sort is complete