You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/01 22:52:09 UTC
svn commit: r543607 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/MapTask.java
src/java/org/apache/hadoop/mapred/ReduceTask.java
Author: cutting
Date: Fri Jun 1 13:52:09 2007
New Revision: 543607
URL: http://svn.apache.org/viewvc?view=rev&rev=543607
Log:
HADOOP-1431. Fix so that sort progress reporting during map truns only while sorting so that stuck maps are correctly terminated. Contributed by Arun & Devaraj.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=543607&r1=543606&r2=543607
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 1 13:52:09 2007
@@ -511,6 +511,10 @@
130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit
tests on Windows. (omalley via cutting)
+131. HADOOP-1431. Fix so that sort progress reporting during map runs
+ only while sorting, so that stuck maps are correctly terminated.
+ (Devaraj Das and Arun C Murthy via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=543607&r1=543606&r2=543607
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 1 13:52:09 2007
@@ -163,6 +163,7 @@
throws IOException {
setProgress(getProgress());
+ reportProgress(umbilical);
long beforePos = getPos();
boolean ret = rawIn.next(key, value);
if (ret) {
@@ -178,22 +179,16 @@
}
};
- Thread sortProgress = createProgressThread(umbilical);
MapRunnable runner =
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
- sortProgress.start();
runner.run(in, collector, reporter);
collector.flush();
} finally {
//close
in.close(); // close input
collector.close();
- sortProgress.interrupt();
- try {
- sortProgress.join();
- } catch (InterruptedException ie){ }
}
done(umbilical);
}
@@ -220,6 +215,7 @@
};
sortProgress.setName("Sort progress reporter for task "+getTaskId());
sortProgress.setDaemon(true);
+ sortProgress.start();
return sortProgress;
}
@@ -273,6 +269,7 @@
private Partitioner partitioner;
private JobConf job;
private Reporter reporter;
+ final private TaskUmbilicalProtocol umbilical;
private DataOutputBuffer keyValBuffer; //the buffer where key/val will
//be stored before they are
@@ -302,6 +299,7 @@
this.job = job;
this.reporter = reporter;
+ this.umbilical = umbilical;
this.comparator = job.getOutputKeyComparator();
this.keyClass = job.getMapOutputKeyClass();
this.valClass = job.getMapOutputValueClass();
@@ -324,6 +322,7 @@
job.getClass("map.sort.class", MergeSorter.class,
BufferSorter.class), job);
}
+
private void startPartition(int partNumber) throws IOException {
//We create the sort output as multiple sequence files within a spilled
//file. So we create a writer for each partition.
@@ -376,10 +375,20 @@
for (int i = 0; i < partitions; i++)
totalMem += sortImpl[i].getMemoryUtilized();
if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
- sortAndSpillToDisk();
- keyValBuffer.reset();
- for (int i = 0; i < partitions; i++)
- sortImpl[i].close();
+
+ // Start the progress thread
+ Thread progress = createProgressThread(umbilical);
+
+ try {
+ sortAndSpillToDisk();
+ keyValBuffer.reset();
+ for (int i = 0; i < partitions; i++) {
+ sortImpl[i].close();
+ }
+ } finally {
+ // Stop the progress thread
+ progress.interrupt();
+ }
}
}
}
@@ -597,13 +606,22 @@
}
public void flush() throws IOException {
- //check whether the length of the key/value buffer is 0. If not, then
- //we need to spill that to disk. Note that we reset the key/val buffer
- //upon each spill (so a length > 0 means that we have not spilled yet)
- if (keyValBuffer.getLength() > 0) {
- sortAndSpillToDisk();
+
+ // Start the progress thread
+ Thread progress = createProgressThread(umbilical);
+
+ try {
+ //check whether the length of the key/value buffer is 0. If not, then
+ //we need to spill that to disk. Note that we reset the key/val buffer
+ //upon each spill (so a length > 0 means that we have not spilled yet)
+ if (keyValBuffer.getLength() > 0) {
+ sortAndSpillToDisk();
+ }
+ mergeParts();
+ } finally {
+ // Stop the progress thread
+ progress.interrupt();
}
- mergeParts();
}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=543607&r1=543606&r2=543607
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 1 13:52:09 2007
@@ -37,6 +37,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,7 +79,7 @@
private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
private int numMaps;
- private boolean sortComplete;
+ AtomicBoolean sortComplete = new AtomicBoolean(false);
private ReduceCopier reduceCopier;
{
@@ -283,7 +284,7 @@
// spawn a thread to give sort progress heartbeats
Thread sortProgress = new Thread() {
public void run() {
- while (!sortComplete) {
+ while (!sortComplete.get()) {
try {
reportProgress(umbilical);
Thread.sleep(PROGRESS_INTERVAL);
@@ -298,6 +299,7 @@
}
}
};
+ sortProgress.setDaemon(true);
sortProgress.setName("Sort progress reporter for task "+getTaskId());
Path tempDir = new Path(getTaskId());
@@ -317,7 +319,7 @@
!conf.getKeepFailedTaskFiles()); // sort
} finally {
- sortComplete = true;
+ sortComplete.set(true);
}
sortPhase.complete(); // sort is complete