You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/05/07 23:51:05 UTC
svn commit: r654292 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/MapTask.java
src/java/org/apache/hadoop/mapred/ReduceTask.java
src/java/org/apache/hadoop/mapred/Task.java
Author: omalley
Date: Wed May 7 14:51:03 2008
New Revision: 654292
URL: http://svn.apache.org/viewvc?rev=654292&view=rev
Log:
HADOOP-3226. Run combiners multiple times over map outputs as they
are merged in both the map and the reduce tasks. Contributed by Chris
Douglas.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 7 14:51:03 2008
@@ -41,6 +41,9 @@
and jobHistory log. Also adds web UI for viewing input splits in job UI
and history UI. (Amareshwari Sriramadasu via ddas)
+ HADOOP-3226. Run combiners multiple times over map outputs as they
+ are merged in both the map and the reduce tasks. (cdouglas via omalley)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed May 7 14:51:03 2008
@@ -250,6 +250,8 @@
private Reporter reporter = null;
+ private final Counters.Counter mapOutputRecordCounter;
+
@SuppressWarnings("unchecked")
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
JobConf job, Reporter reporter) throws IOException {
@@ -258,6 +260,9 @@
FileSystem fs = FileSystem.get(job);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
+
+ Counters counters = getCounters();
+ mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
}
public void close() throws IOException {
@@ -272,7 +277,8 @@
public void collect(K key, V value) throws IOException {
reporter.progress();
- this.out.write(key, value);
+ out.write(key, value);
+ mapOutputRecordCounter.increment(1);
}
}
@@ -324,6 +330,7 @@
private volatile Throwable sortSpillException = null;
private final int softRecordLimit;
private final int softBufferLimit;
+ private final int minSpillsForCombine;
private final Object spillLock = new Object();
private final QuickSort sorter = new QuickSort();
private final BlockingBuffer bb = new BlockingBuffer();
@@ -381,11 +388,12 @@
Counters counters = getCounters();
mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
- combineInputCounter = getCounters().findCounter(COMBINE_INPUT_RECORDS);
+ combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
// combiner and compression
compressMapOutput = job.getCompressMapOutput();
combinerClass = job.getCombinerClass();
+ minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
if (compressMapOutput) {
compressionType = job.getMapOutputCompressionType();
Class<? extends CompressionCodec> codecClass =
@@ -415,7 +423,7 @@
deflateFilter = null;
}
combineCollector = (null != combinerClass)
- ? new CombineOutputCollector(reporter)
+ ? new CombineOutputCollector(combineOutputCounter)
: null;
}
@@ -784,11 +792,10 @@
// to remedy this would require us to observe the compression
// strategy here as we do in collect
if (spstart != spindex) {
- Reducer combiner =
- (Reducer)ReflectionUtils.newInstance(combinerClass, job);
combineCollector.setWriter(writer);
- combineAndSpill(spstart, spindex, combiner, combineCollector);
- // combineAndSpill closes combiner
+ RawKeyValueIterator kvIter =
+ new MRResultIterator(spstart, spindex);
+ combineAndSpill(kvIter, combineInputCounter);
}
}
// we need to close the writer to flush buffered data, obtaining
@@ -873,16 +880,17 @@
}
@SuppressWarnings("unchecked")
- private void combineAndSpill(int start, int end, Reducer combiner,
- OutputCollector combineCollector) throws IOException {
+ private void combineAndSpill(RawKeyValueIterator kvIter,
+ Counters.Counter inCounter) throws IOException {
+ Reducer combiner =
+ (Reducer)ReflectionUtils.newInstance(combinerClass, job);
try {
CombineValuesIterator values = new CombineValuesIterator(
- new MRResultIterator(start, end), comparator, keyClass, valClass,
- job, reporter);
+ kvIter, comparator, keyClass, valClass, job, reporter,
+ inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector, reporter);
values.nextKey();
- combineOutputCounter.increment(1);
// indicate we're making progress
reporter.progress();
}
@@ -951,23 +959,6 @@
public void close() { }
}
- private class CombineValuesIterator<KEY,VALUE>
- extends ValuesIterator<KEY,VALUE> {
-
- public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
- RawComparator<KEY> comparator, Class<KEY> keyClass,
- Class<VALUE> valClass, Configuration conf, Reporter reporter)
- throws IOException {
- super(in, comparator, keyClass, valClass, conf, reporter);
- }
-
- @Override
- public VALUE next() {
- combineInputCounter.increment(1);
- return super.next();
- }
- }
-
private void mergeParts() throws IOException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
@@ -1049,7 +1040,12 @@
SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
compressionType, codec);
- sorter.writeFile(kvIter, writer);
+ if (null == combinerClass || numSpills < minSpillsForCombine) {
+ sorter.writeFile(kvIter, writer);
+ } else {
+ combineCollector.setWriter(writer);
+ combineAndSpill(kvIter, combineInputCounter);
+ }
//close the file - required esp. for block compression to ensure
//partition data don't span partition boundaries
writer.close();
@@ -1074,25 +1070,6 @@
}
/**
- * OutputCollector for the combiner.
- */
- private static class CombineOutputCollector implements OutputCollector {
- private Reporter reporter;
- private SequenceFile.Writer writer;
- public CombineOutputCollector(Reporter reporter) {
- this.reporter = reporter;
- }
- public synchronized void setWriter(SequenceFile.Writer writer) {
- this.writer = writer;
- }
- public synchronized void collect(Object key, Object value)
- throws IOException {
- reporter.progress();
- writer.append(key, value);
- }
- }
-
- /**
* Exception indicating that the allocated sort buffer is insufficient
* to hold the current record.
*/
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed May 7 14:51:03 2008
@@ -18,10 +18,6 @@
package org.apache.hadoop.mapred;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_GROUPS;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_OUTPUT_RECORDS;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
@@ -98,12 +94,16 @@
private Progress sortPhase = getProgress().addPhase("sort");
private Progress reducePhase = getProgress().addPhase("reduce");
private Counters.Counter reduceInputKeyCounter =
- getCounters().findCounter(REDUCE_INPUT_GROUPS);
+ getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
private Counters.Counter reduceInputValueCounter =
- getCounters().findCounter(REDUCE_INPUT_RECORDS);
+ getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
private Counters.Counter reduceOutputCounter =
- getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
-
+ getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+ private Counters.Counter reduceCombineInputCounter =
+ getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
+ private Counters.Counter reduceCombineOutputCounter =
+ getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
+
// A custom comparator for map output files. Here the ordering is determined
// by the file's size and path. In case of files with same size and different
// file paths, the first parameter is considered smaller than the second one.
@@ -568,6 +568,16 @@
private int maxFetchRetriesPerMap;
/**
+ * Combiner class to run during in-memory merge, if defined.
+ */
+ private final Class<? extends Reducer> combinerClass;
+
+ /**
+ * Resettable collector used for combine.
+ */
+ private final CombineOutputCollector combineCollector;
+
+ /**
* Maximum percent of failed fetch attempt before killing the reduce task.
*/
private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
@@ -945,6 +955,10 @@
this.copyResults = new ArrayList<CopyResult>(100);
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+ this.combinerClass = conf.getCombinerClass();
+ combineCollector = (null != combinerClass)
+ ? new CombineOutputCollector(reduceCombineOutputCounter)
+ : null;
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
// the exponential backoff formula
@@ -1639,15 +1653,22 @@
SequenceFile.Sorter.RawKeyValueIterator rIter;
try {
rIter = sorter.merge(inMemClosedFiles, true,
- inMemClosedFiles.length, new Path(reduceTask.getTaskID().toString()));
+ inMemClosedFiles.length,
+ new Path(reduceTask.getTaskID().toString()));
+ if (null == combinerClass) {
+ sorter.writeFile(rIter, writer);
+ } else {
+ combineCollector.setWriter(writer);
+ combineAndSpill(rIter, reduceCombineInputCounter);
+ }
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
writer.close();
localFileSys.delete(outputPath, true);
- throw new IOException (StringUtils.stringifyException(e));
+ throw (IOException)new IOException
+ ("Intermedate merge failed").initCause(e);
}
- sorter.writeFile(rIter, writer);
writer.close();
LOG.info(reduceTask.getTaskID() +
" Merge of the " +inMemClosedFiles.length +
@@ -1679,6 +1700,31 @@
return file.toString().endsWith(".out");
}
};
+
+ @SuppressWarnings("unchecked")
+ private void combineAndSpill(
+ SequenceFile.Sorter.RawKeyValueIterator kvIter,
+ Counters.Counter inCounter) throws IOException {
+ JobConf job = (JobConf)getConf();
+ Reducer combiner =
+ (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+ Class keyClass = job.getMapOutputKeyClass();
+ Class valClass = job.getMapOutputValueClass();
+ RawComparator comparator = job.getOutputKeyComparator();
+ try {
+ CombineValuesIterator values = new CombineValuesIterator(
+ kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+ inCounter);
+ while (values.more()) {
+ combiner.reduce(values.getKey(), values, combineCollector,
+ Reporter.NULL);
+ values.nextKey();
+ }
+ } finally {
+ combiner.close();
+ }
+ }
+
}
private static int getClosestPowerOf2(int value) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed May 7 14:51:03 2008
@@ -40,8 +40,11 @@
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.kfs.KosmosFileSystem;
import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -589,4 +592,43 @@
}
}
}
+
+ /**
+ * OutputCollector for the combiner.
+ */
+ protected static class CombineOutputCollector implements OutputCollector {
+ private SequenceFile.Writer writer;
+ private Counters.Counter outCounter;
+ public CombineOutputCollector(Counters.Counter outCounter) {
+ this.outCounter = outCounter;
+ }
+ public synchronized void setWriter(SequenceFile.Writer writer) {
+ this.writer = writer;
+ }
+ public synchronized void collect(Object key, Object value)
+ throws IOException {
+ outCounter.increment(1);
+ writer.append(key, value);
+ }
+ }
+
+ protected static class CombineValuesIterator<KEY,VALUE>
+ extends ValuesIterator<KEY,VALUE> {
+
+ private final Counters.Counter combineInputCounter;
+
+ public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+ RawComparator<KEY> comparator, Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf, Reporter reporter,
+ Counters.Counter combineInputCounter) throws IOException {
+ super(in, comparator, keyClass, valClass, conf, reporter);
+ this.combineInputCounter = combineInputCounter;
+ }
+
+ public VALUE next() {
+ combineInputCounter.increment(1);
+ return super.next();
+ }
+ }
+
}