You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/06/19 02:42:50 UTC
svn commit: r669342 - in /hadoop/core/branches/branch-0.18: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobConf.java
src/mapred/org/apache/hadoop/mapred/MapTask.java
src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Author: cdouglas
Date: Wed Jun 18 17:42:49 2008
New Revision: 669342
URL: http://svn.apache.org/viewvc?rev=669342&view=rev
Log:
HADOOP-3586. Provide deprecated, backwards compatibile semantics for the
combiner to be run once and only once on each record.
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Wed Jun 18 17:42:49 2008
@@ -617,6 +617,9 @@
HADOOP-3526. Fix contrib/data_join framework by cloning values retained
in the reduce. (Spyros Blanas via cdouglas)
+ HADOOP-3586. Provide deprecated, backwards compatibile semantics for the
+ combiner to be run once and only once on each record. (cdouglas)
+
Release 0.17.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java Wed Jun 18 17:42:49 2008
@@ -799,6 +799,20 @@
}
/**
+ * If true, ensures the combiner is run once and only once on output from
+ * the map. Otherwise, combiner may be run zero or more times.
+ */
+ @Deprecated
+ public void setCombineOnceOnly(JobConf conf, boolean value) {
+ conf.setBoolean("mapred.combine.once", value);
+ }
+
+ @Deprecated
+ public boolean getCombineOnceOnly() {
+ return getBoolean("mapred.combine.once", false);
+ }
+
+ /**
* Should speculative execution be used for this job?
* Defaults to <code>true</code>.
*
Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Jun 18 17:42:49 2008
@@ -31,7 +31,9 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -826,11 +828,30 @@
writer = new IFile.Writer(job, out, keyClass, valClass, codec);
if (i == partition) {
- final long recordStart = out.getPos();
- writer.append(key, value);
- // Note that our map byte count will not be accurate with
- // compression
- mapOutputByteCounter.increment(out.getPos() - recordStart);
+ if (job.getCombineOnceOnly()) {
+ Reducer combiner =
+ (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+ combineCollector.setWriter(writer);
+ combiner.reduce(key, new Iterator<V>() {
+ private boolean done = false;
+ public boolean hasNext() { return !done; }
+ public V next() {
+ if (done)
+ throw new NoSuchElementException();
+ done = true;
+ return value;
+ }
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }, combineCollector, reporter);
+ } else {
+ final long recordStart = out.getPos();
+ writer.append(key, value);
+ // Note that our map byte count will not be accurate with
+ // compression
+ mapOutputByteCounter.increment(out.getPos() - recordStart);
+ }
}
writer.close();
@@ -1030,7 +1051,8 @@
segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
- if (null == combinerClass || numSpills < minSpillsForCombine) {
+ if (null == combinerClass || job.getCombineOnceOnly() ||
+ numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter);
} else {
combineCollector.setWriter(writer);
Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 18 17:42:49 2008
@@ -1286,7 +1286,9 @@
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxInFlight = 4 * numCopiers;
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
- this.combinerClass = conf.getCombinerClass();
+ this.combinerClass = conf.getCombineOnceOnly()
+ ? null
+ : conf.getCombinerClass();
combineCollector = (null != combinerClass)
? new CombineOutputCollector(reduceCombineOutputCounter)
: null;