You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/16 00:31:39 UTC
svn commit: r487715 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/MapTask.java
Author: cutting
Date: Fri Dec 15 15:31:39 2006
New Revision: 487715
URL: http://svn.apache.org/viewvc?view=rev&rev=487715
Log:
HADOOP-813. Fix map output sorting to report progress. Contributed by Devaraj.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=487715&r1=487714&r2=487715
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 15 15:31:39 2006
@@ -101,6 +101,10 @@
28. HADOOP-824. Rename DFSShell to be FsShell, since it applies
generically to all FileSystem implementations. (cutting)
+29. HADOOP-813. Fix map output sorting to report progress, so that
+ sorts which take longer than the task timeout do not fail.
+ (Devaraj Das via cutting)
+
Release 0.9.2 - 2006-12-15
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=487715&r1=487714&r2=487715
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Dec 15 15:31:39 2006
@@ -35,6 +35,7 @@
import org.apache.commons.logging.*;
import org.apache.hadoop.metrics.Metrics;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
@@ -341,6 +342,7 @@
throws IOException {
synchronized (this) {
writer.append(key, value);
+ reportProgress(umbilical);
}
}
};
@@ -366,6 +368,7 @@
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector, reporter);
values.nextKey();
+ reportProgress(umbilical);
}
}
@@ -393,6 +396,7 @@
value.readFields(valIn);
writer.append(key, value);
+ reportProgress(umbilical);
}
}
@@ -425,62 +429,87 @@
compressionType, codec);
finalIndexOut.writeLong(segmentStart);
finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
+ reportProgress(umbilical);
}
finalOut.close();
finalIndexOut.close();
return;
}
-
- Path [] filename = new Path[numSpills];
- Path [] indexFileName = new Path[numSpills];
- FSDataInputStream in[] = new FSDataInputStream[numSpills];
- FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
-
- for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
- in[i] = localFs.open(filename[i]);
- indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
- indexIn[i] = localFs.open(indexFileName[i]);
- }
-
- //create a sorter object as we need access to the SegmentDescriptor
- //class and merge methods
- Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
- sorter.setFactor(numSpills);
-
- for (int parts = 0; parts < partitions; parts++){
- List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+ //spawn a thread to give merge progress heartbeats
+ Thread sortProgress = new Thread() {
+ public void run() {
+ while (true) {
+ try {
+ reportProgress(umbilical);
+ Thread.sleep(PROGRESS_INTERVAL);
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable e) {
+ LOG.info("Thread Exception in " +
+ "reporting sort progress\n" +
+ StringUtils.stringifyException(e));
+ continue;
+ }
+ }
+ }
+ };
+ sortProgress.setName("Sort progress reporter for task "+getTaskId());
+ sortProgress.setDaemon(true);
+ sortProgress.start();
+ try {
+ Path [] filename = new Path[numSpills];
+ Path [] indexFileName = new Path[numSpills];
+ FSDataInputStream in[] = new FSDataInputStream[numSpills];
+ FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
+
for(int i = 0; i < numSpills; i++) {
- long segmentOffset = indexIn[i].readLong();
- long segmentLength = indexIn[i].readLong();
- SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
- segmentLength, filename[i]);
- s.preserveInput(true);
- s.doSync();
- segmentList.add(i, s);
+ filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
+ in[i] = localFs.open(filename[i]);
+ indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
+ indexIn[i] = localFs.open(indexFileName[i]);
}
- segmentStart = finalOut.getPos();
- SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
- job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
- compressionType, codec);
- sorter.writeFile(sorter.merge(segmentList), writer);
- //add a sync block - required esp. for block compression to ensure
- //partition data don't span partition boundaries
- writer.sync();
- //when we write the offset/length to the final index file, we write
- //longs for both. This helps us to reliably seek directly to the
- //offset/length for a partition when we start serving the byte-ranges
- //to the reduces. We probably waste some space in the file by doing
- //this as opposed to writing VLong but it helps us later on.
- finalIndexOut.writeLong(segmentStart);
- finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
- }
- finalOut.close();
- finalIndexOut.close();
- //cleanup
- for(int i = 0; i < numSpills; i++) {
- in[i].close(); localFs.delete(filename[i]);
- indexIn[i].close(); localFs.delete(indexFileName[i]);
+
+ //create a sorter object as we need access to the SegmentDescriptor
+ //class and merge methods
+ Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+ sorter.setFactor(numSpills);
+
+ for (int parts = 0; parts < partitions; parts++){
+ List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+ for(int i = 0; i < numSpills; i++) {
+ long segmentOffset = indexIn[i].readLong();
+ long segmentLength = indexIn[i].readLong();
+ SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
+ segmentLength, filename[i]);
+ s.preserveInput(true);
+ s.doSync();
+ segmentList.add(i, s);
+ }
+ segmentStart = finalOut.getPos();
+ SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+ compressionType, codec);
+ sorter.writeFile(sorter.merge(segmentList), writer);
+ //add a sync block - required esp. for block compression to ensure
+ //partition data don't span partition boundaries
+ writer.sync();
+ //when we write the offset/length to the final index file, we write
+ //longs for both. This helps us to reliably seek directly to the
+ //offset/length for a partition when we start serving the byte-ranges
+ //to the reduces. We probably waste some space in the file by doing
+ //this as opposed to writing VLong but it helps us later on.
+ finalIndexOut.writeLong(segmentStart);
+ finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
+ }
+ finalOut.close();
+ finalIndexOut.close();
+ //cleanup
+ for(int i = 0; i < numSpills; i++) {
+ in[i].close(); localFs.delete(filename[i]);
+ indexIn[i].close(); localFs.delete(indexFileName[i]);
+ }
+ } finally {
+ sortProgress.interrupt();
}
}