You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/06/19 02:42:50 UTC

svn commit: r669342 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobConf.java src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Author: cdouglas
Date: Wed Jun 18 17:42:49 2008
New Revision: 669342

URL: http://svn.apache.org/viewvc?rev=669342&view=rev
Log:
HADOOP-3586. Provide deprecated, backwards compatibile semantics for the
combiner to be run once and only once on each record.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Wed Jun 18 17:42:49 2008
@@ -617,6 +617,9 @@
     HADOOP-3526. Fix contrib/data_join framework by cloning values retained
     in the reduce. (Spyros Blanas via cdouglas)
 
+    HADOOP-3586. Provide deprecated, backwards compatibile semantics for the
+    combiner to be run once and only once on each record. (cdouglas)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobConf.java Wed Jun 18 17:42:49 2008
@@ -799,6 +799,20 @@
   }
   
   /**
+   * If true, ensures the combiner is run once and only once on output from
+   * the map. Otherwise, combiner may be run zero or more times.
+   */
+  @Deprecated
+  public void setCombineOnceOnly(JobConf conf, boolean value) {
+    conf.setBoolean("mapred.combine.once", value);
+  }
+
+  @Deprecated
+  public boolean getCombineOnceOnly() {
+    return getBoolean("mapred.combine.once", false);
+  }
+
+  /**
    * Should speculative execution be used for this job? 
    * Defaults to <code>true</code>.
    * 

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Jun 18 17:42:49 2008
@@ -31,7 +31,9 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -826,11 +828,30 @@
             writer = new IFile.Writer(job, out, keyClass, valClass, codec);
 
             if (i == partition) {
-              final long recordStart = out.getPos();
-              writer.append(key, value);
-              // Note that our map byte count will not be accurate with
-              // compression
-              mapOutputByteCounter.increment(out.getPos() - recordStart);
+              if (job.getCombineOnceOnly()) {
+                Reducer combiner =
+                  (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+                combineCollector.setWriter(writer);
+                combiner.reduce(key, new Iterator<V>() {
+                    private boolean done = false;
+                    public boolean hasNext() { return !done; }
+                    public V next() {
+                      if (done)
+                        throw new NoSuchElementException();
+                      done = true;
+                      return value;
+                    }
+                    public void remove() {
+                      throw new UnsupportedOperationException();
+                    }
+                  }, combineCollector, reporter);
+              } else {
+                final long recordStart = out.getPos();
+                writer.append(key, value);
+                // Note that our map byte count will not be accurate with
+                // compression
+                mapOutputByteCounter.increment(out.getPos() - recordStart);
+              }
             }
             writer.close();
 
@@ -1030,7 +1051,8 @@
           segmentStart = finalOut.getPos();
           Writer<K, V> writer = 
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
-          if (null == combinerClass || numSpills < minSpillsForCombine) {
+          if (null == combinerClass || job.getCombineOnceOnly() ||
+              numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter);
           } else {
             combineCollector.setWriter(writer);

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=669342&r1=669341&r2=669342&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 18 17:42:49 2008
@@ -1286,7 +1286,9 @@
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxInFlight = 4 * numCopiers;
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
-      this.combinerClass = conf.getCombinerClass();
+      this.combinerClass = conf.getCombineOnceOnly()
+        ? null
+        : conf.getCombinerClass();
       combineCollector = (null != combinerClass)
         ? new CombineOutputCollector(reduceCombineOutputCounter)
         : null;