You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/05/07 23:51:05 UTC

svn commit: r654292 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/Task.java

Author: omalley
Date: Wed May  7 14:51:03 2008
New Revision: 654292

URL: http://svn.apache.org/viewvc?rev=654292&view=rev
Log:
HADOOP-3226. Run combiners multiple times over map outputs as they
are merged in both the map and the reduce tasks. Contributed by Chris
Douglas.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May  7 14:51:03 2008
@@ -41,6 +41,9 @@
     and jobHistory log. Also adds web UI for viewing input splits in job UI 
     and history UI. (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3226. Run combiners multiple times over map outputs as they
+    are merged in both the map and the reduce tasks. (cdouglas via omalley)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed May  7 14:51:03 2008
@@ -250,6 +250,8 @@
 
     private Reporter reporter = null;
 
+    private final Counters.Counter mapOutputRecordCounter;
+
     @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
         JobConf job, Reporter reporter) throws IOException {
@@ -258,6 +260,9 @@
       FileSystem fs = FileSystem.get(job);
 
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
+
+      Counters counters = getCounters();
+      mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
     }
 
     public void close() throws IOException {
@@ -272,7 +277,8 @@
 
     public void collect(K key, V value) throws IOException {
       reporter.progress();
-      this.out.write(key, value);
+      out.write(key, value);
+      mapOutputRecordCounter.increment(1);
     }
     
   }
@@ -324,6 +330,7 @@
     private volatile Throwable sortSpillException = null;
     private final int softRecordLimit;
     private final int softBufferLimit;
+    private final int minSpillsForCombine;
     private final Object spillLock = new Object();
     private final QuickSort sorter = new QuickSort();
     private final BlockingBuffer bb = new BlockingBuffer();
@@ -381,11 +388,12 @@
       Counters counters = getCounters();
       mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
-      combineInputCounter = getCounters().findCounter(COMBINE_INPUT_RECORDS);
+      combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
       // combiner and compression
       compressMapOutput = job.getCompressMapOutput();
       combinerClass = job.getCombinerClass();
+      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
       if (compressMapOutput) {
         compressionType = job.getMapOutputCompressionType();
         Class<? extends CompressionCodec> codecClass =
@@ -415,7 +423,7 @@
         deflateFilter = null;
       }
       combineCollector = (null != combinerClass)
-        ? new CombineOutputCollector(reporter)
+        ? new CombineOutputCollector(combineOutputCounter)
         : null;
     }
 
@@ -784,11 +792,10 @@
               // to remedy this would require us to observe the compression
               // strategy here as we do in collect
               if (spstart != spindex) {
-                Reducer combiner =
-                  (Reducer)ReflectionUtils.newInstance(combinerClass, job);
                 combineCollector.setWriter(writer);
-                combineAndSpill(spstart, spindex, combiner, combineCollector);
-                // combineAndSpill closes combiner
+                RawKeyValueIterator kvIter =
+                  new MRResultIterator(spstart, spindex);
+                combineAndSpill(kvIter, combineInputCounter);
               }
             }
             // we need to close the writer to flush buffered data, obtaining
@@ -873,16 +880,17 @@
     }
 
     @SuppressWarnings("unchecked")
-    private void combineAndSpill(int start, int end, Reducer combiner,
-        OutputCollector combineCollector) throws IOException {
+    private void combineAndSpill(RawKeyValueIterator kvIter,
+        Counters.Counter inCounter) throws IOException {
+      Reducer combiner =
+        (Reducer)ReflectionUtils.newInstance(combinerClass, job);
       try {
         CombineValuesIterator values = new CombineValuesIterator(
-            new MRResultIterator(start, end), comparator, keyClass, valClass,
-            job, reporter);
+            kvIter, comparator, keyClass, valClass, job, reporter,
+            inCounter);
         while (values.more()) {
           combiner.reduce(values.getKey(), values, combineCollector, reporter);
           values.nextKey();
-          combineOutputCounter.increment(1);
           // indicate we're making progress
           reporter.progress();
         }
@@ -951,23 +959,6 @@
       public void close() { }
     }
 
-    private class CombineValuesIterator<KEY,VALUE>
-        extends ValuesIterator<KEY,VALUE> {
-
-      public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
-          RawComparator<KEY> comparator, Class<KEY> keyClass,
-          Class<VALUE> valClass, Configuration conf, Reporter reporter)
-          throws IOException {
-        super(in, comparator, keyClass, valClass, conf, reporter);
-      }
-
-      @Override
-      public VALUE next() {
-        combineInputCounter.increment(1);
-        return super.next();
-      }
-    }
-
     private void mergeParts() throws IOException {
       // get the approximate size of the final output/index files
       long finalOutFileSize = 0;
@@ -1049,7 +1040,12 @@
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
                                                                  job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
                                                                  compressionType, codec);
-          sorter.writeFile(kvIter, writer);
+          if (null == combinerClass || numSpills < minSpillsForCombine) {
+            sorter.writeFile(kvIter, writer);
+          } else {
+            combineCollector.setWriter(writer);
+            combineAndSpill(kvIter, combineInputCounter);
+          }
           //close the file - required esp. for block compression to ensure
           //partition data don't span partition boundaries
           writer.close();
@@ -1074,25 +1070,6 @@
   }
 
   /**
-   * OutputCollector for the combiner.
-   */
-  private static class CombineOutputCollector implements OutputCollector {
-    private Reporter reporter;
-    private SequenceFile.Writer writer;
-    public CombineOutputCollector(Reporter reporter) {
-      this.reporter = reporter;
-    }
-    public synchronized void setWriter(SequenceFile.Writer writer) {
-      this.writer = writer;
-    }
-    public synchronized void collect(Object key, Object value)
-        throws IOException {
-        reporter.progress();
-        writer.append(key, value);
-    }
-  }
-
-  /**
    * Exception indicating that the allocated sort buffer is insufficient
    * to hold the current record.
    */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed May  7 14:51:03 2008
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_GROUPS;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_OUTPUT_RECORDS;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
@@ -98,12 +94,16 @@
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
   private Counters.Counter reduceInputKeyCounter = 
-    getCounters().findCounter(REDUCE_INPUT_GROUPS);
+    getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
-    getCounters().findCounter(REDUCE_INPUT_RECORDS);
+    getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
-    getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
-  
+    getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+  private Counters.Counter reduceCombineInputCounter =
+    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
+  private Counters.Counter reduceCombineOutputCounter =
+    getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
+
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
   // file paths, the first parameter is considered smaller than the second one.
@@ -568,6 +568,16 @@
     private int maxFetchRetriesPerMap;
     
     /**
+     * Combiner class to run during in-memory merge, if defined.
+     */
+    private final Class<? extends Reducer> combinerClass;
+
+    /**
+     * Resettable collector used for combine.
+     */
+    private final CombineOutputCollector combineCollector;
+
+    /**
      * Maximum percent of failed fetch attempt before killing the reduce task.
      */
     private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
@@ -945,6 +955,10 @@
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+      this.combinerClass = conf.getCombinerClass();
+      combineCollector = (null != combinerClass)
+        ? new CombineOutputCollector(reduceCombineOutputCounter)
+        : null;
       
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
       // the exponential backoff formula
@@ -1639,15 +1653,22 @@
             SequenceFile.Sorter.RawKeyValueIterator rIter;
             try {
               rIter = sorter.merge(inMemClosedFiles, true, 
-                                   inMemClosedFiles.length, new Path(reduceTask.getTaskID().toString()));
+                                   inMemClosedFiles.length, 
+                                   new Path(reduceTask.getTaskID().toString()));
+              if (null == combinerClass) {
+                sorter.writeFile(rIter, writer);
+              } else {
+                combineCollector.setWriter(writer);
+                combineAndSpill(rIter, reduceCombineInputCounter);
+              }
             } catch (Exception e) { 
               //make sure that we delete the ondisk file that we created 
               //earlier when we invoked cloneFileAttributes
               writer.close();
               localFileSys.delete(outputPath, true);
-              throw new IOException (StringUtils.stringifyException(e));
+              throw (IOException)new IOException
+                      ("Intermedate merge failed").initCause(e);
             }
-            sorter.writeFile(rIter, writer);
             writer.close();
             LOG.info(reduceTask.getTaskID() + 
                      " Merge of the " +inMemClosedFiles.length +
@@ -1679,6 +1700,31 @@
           return file.toString().endsWith(".out");
         }     
       };
+
+    @SuppressWarnings("unchecked")
+    private void combineAndSpill(
+        SequenceFile.Sorter.RawKeyValueIterator kvIter,
+        Counters.Counter inCounter) throws IOException {
+      JobConf job = (JobConf)getConf();
+      Reducer combiner =
+        (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+      Class keyClass = job.getMapOutputKeyClass();
+      Class valClass = job.getMapOutputValueClass();
+      RawComparator comparator = job.getOutputKeyComparator();
+      try {
+        CombineValuesIterator values = new CombineValuesIterator(
+            kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+            inCounter);
+        while (values.more()) {
+          combiner.reduce(values.getKey(), values, combineCollector,
+              Reporter.NULL);
+          values.nextKey();
+        }
+      } finally {
+        combiner.close();
+      }
+    }
+
   }
 
   private static int getClosestPowerOf2(int value) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=654292&r1=654291&r2=654292&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed May  7 14:51:03 2008
@@ -40,8 +40,11 @@
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.kfs.KosmosFileSystem;
 import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -589,4 +592,43 @@
       }
     }
   }
+
+  /**
+   * OutputCollector for the combiner.
+   */
+  protected static class CombineOutputCollector implements OutputCollector {
+    private SequenceFile.Writer writer;
+    private Counters.Counter outCounter;
+    public CombineOutputCollector(Counters.Counter outCounter) {
+      this.outCounter = outCounter;
+    }
+    public synchronized void setWriter(SequenceFile.Writer writer) {
+      this.writer = writer;
+    }
+    public synchronized void collect(Object key, Object value)
+        throws IOException {
+      outCounter.increment(1);
+      writer.append(key, value);
+    }
+  }
+
+  protected static class CombineValuesIterator<KEY,VALUE>
+      extends ValuesIterator<KEY,VALUE> {
+
+    private final Counters.Counter combineInputCounter;
+
+    public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+        RawComparator<KEY> comparator, Class<KEY> keyClass,
+        Class<VALUE> valClass, Configuration conf, Reporter reporter,
+        Counters.Counter combineInputCounter) throws IOException {
+      super(in, comparator, keyClass, valClass, conf, reporter);
+      this.combineInputCounter = combineInputCounter;
+    }
+
+    public VALUE next() {
+      combineInputCounter.increment(1);
+      return super.next();
+    }
+  }
+
 }