You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2012/12/17 00:27:29 UTC

svn commit: r1422716 - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/...

Author: suresh
Date: Sun Dec 16 23:27:26 2012
New Revision: 1422716

URL: http://svn.apache.org/viewvc?rev=1422716&view=rev
Log:
Merge trunk into branch-trunk-win

Added:
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
      - copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
      - copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
      - copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
Modified:
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1422281-1422713

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Sun Dec 16 23:27:26 2012
@@ -14,6 +14,8 @@ Trunk (Unreleased)
     MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
     (Avner BenHanoch via acmurthy) 
 
+    MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
+
   IMPROVEMENTS
 
     MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -71,6 +73,9 @@ Trunk (Unreleased)
     MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
     (Brandon Li via suresh)
 
+    MAPREDUCE-4809. Change visibility of classes for pluggable sort changes. 
+    (masokan via tucu)
+
   BUG FIXES
 
     MAPREDUCE-4356. [Rumen] Provide access to the method

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1422281-1422713

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1422281-1422713

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Sun Dec 16 23:27:26 2012
@@ -34,6 +34,8 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +56,7 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -71,7 +74,9 @@ import org.apache.hadoop.util.StringInte
 import org.apache.hadoop.util.StringUtils;
 
 /** A Map task. */
-class MapTask extends Task {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapTask extends Task {
   /**
    * The size of each record in the index file for the map-outputs.
    */
@@ -338,6 +343,10 @@ class MapTask extends Task {
     done(umbilical, reporter);
   }
 
+  public Progress getSortPhase() {
+    return sortPhase;
+  }
+
  @SuppressWarnings("unchecked")
  private <T> T getSplitDetails(Path file, long offset) 
   throws IOException {
@@ -367,6 +376,22 @@ class MapTask extends Task {
  }
   
   @SuppressWarnings("unchecked")
+  private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
+          createSortingCollector(JobConf job, TaskReporter reporter)
+    throws IOException, ClassNotFoundException {
+    MapOutputCollector<KEY, VALUE> collector
+      = (MapOutputCollector<KEY, VALUE>)
+       ReflectionUtils.newInstance(
+                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+                        MapOutputBuffer.class, MapOutputCollector.class), job);
+    LOG.info("Map output collector class = " + collector.getClass().getName());
+    MapOutputCollector.Context context =
+                           new MapOutputCollector.Context(this, job, reporter);
+    collector.init(context);
+    return collector;
+  }
+
+  @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runOldMapper(final JobConf job,
                     final TaskSplitIndex splitIndex,
@@ -388,11 +413,14 @@ class MapTask extends Task {
 
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
-    MapOutputCollector collector = null;
+    MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
     if (numReduceTasks > 0) {
-      collector = new MapOutputBuffer(umbilical, job, reporter);
+      collector = createSortingCollector(job, reporter);
     } else { 
-      collector = new DirectMapOutputCollector(umbilical, job, reporter);
+      collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
+       MapOutputCollector.Context context =
+                           new MapOutputCollector.Context(this, job, reporter);
+      collector.init(context);
     }
     MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@@ -638,7 +666,7 @@ class MapTask extends Task {
                        TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException {
-      collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+      collector = createSortingCollector(job, reporter);
       partitions = jobContext.getNumReduceTasks();
       if (partitions > 1) {
         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@@ -734,17 +762,6 @@ class MapTask extends Task {
     output.close(mapperContext);
   }
 
-  interface MapOutputCollector<K, V> {
-
-    public void collect(K key, V value, int partition
-                        ) throws IOException, InterruptedException;
-    public void close() throws IOException, InterruptedException;
-    
-    public void flush() throws IOException, InterruptedException, 
-                               ClassNotFoundException;
-        
-  }
-
   class DirectMapOutputCollector<K, V>
     implements MapOutputCollector<K, V> {
  
@@ -752,14 +769,18 @@ class MapTask extends Task {
 
     private TaskReporter reporter = null;
 
-    private final Counters.Counter mapOutputRecordCounter;
-    private final Counters.Counter fileOutputByteCounter;
-    private final List<Statistics> fsStats;
+    private Counters.Counter mapOutputRecordCounter;
+    private Counters.Counter fileOutputByteCounter;
+    private List<Statistics> fsStats;
+
+    public DirectMapOutputCollector() {
+    }
 
     @SuppressWarnings("unchecked")
-    public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
-        JobConf job, TaskReporter reporter) throws IOException {
-      this.reporter = reporter;
+    public void init(MapOutputCollector.Context context
+                    ) throws IOException, ClassNotFoundException {
+      this.reporter = context.getReporter();
+      JobConf job = context.getJobConf();
       String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(job);
 
@@ -814,25 +835,27 @@ class MapTask extends Task {
     }
   }
 
-  private class MapOutputBuffer<K extends Object, V extends Object>
+  @InterfaceAudience.LimitedPrivate({"MapReduce"})
+  @InterfaceStability.Unstable
+  public static class MapOutputBuffer<K extends Object, V extends Object>
       implements MapOutputCollector<K, V>, IndexedSortable {
-    final int partitions;
-    final JobConf job;
-    final TaskReporter reporter;
-    final Class<K> keyClass;
-    final Class<V> valClass;
-    final RawComparator<K> comparator;
-    final SerializationFactory serializationFactory;
-    final Serializer<K> keySerializer;
-    final Serializer<V> valSerializer;
-    final CombinerRunner<K,V> combinerRunner;
-    final CombineOutputCollector<K, V> combineCollector;
+    private int partitions;
+    private JobConf job;
+    private TaskReporter reporter;
+    private Class<K> keyClass;
+    private Class<V> valClass;
+    private RawComparator<K> comparator;
+    private SerializationFactory serializationFactory;
+    private Serializer<K> keySerializer;
+    private Serializer<V> valSerializer;
+    private CombinerRunner<K,V> combinerRunner;
+    private CombineOutputCollector<K, V> combineCollector;
 
     // Compression for map-outputs
-    final CompressionCodec codec;
+    private CompressionCodec codec;
 
     // k/v accounting
-    final IntBuffer kvmeta; // metadata overlay on backing store
+    private IntBuffer kvmeta; // metadata overlay on backing store
     int kvstart;            // marks origin of spill metadata
     int kvend;              // marks end of spill metadata
     int kvindex;            // marks end of fully serialized records
@@ -856,15 +879,15 @@ class MapTask extends Task {
     private static final int METASIZE = NMETA * 4; // size in bytes
 
     // spill accounting
-    final int maxRec;
-    final int softLimit;
+    private int maxRec;
+    private int softLimit;
     boolean spillInProgress;;
     int bufferRemaining;
     volatile Throwable sortSpillException = null;
 
     int numSpills = 0;
-    final int minSpillsForCombine;
-    final IndexedSorter sorter;
+    private int minSpillsForCombine;
+    private IndexedSorter sorter;
     final ReentrantLock spillLock = new ReentrantLock();
     final Condition spillDone = spillLock.newCondition();
     final Condition spillReady = spillLock.newCondition();
@@ -872,12 +895,12 @@ class MapTask extends Task {
     volatile boolean spillThreadRunning = false;
     final SpillThread spillThread = new SpillThread();
 
-    final FileSystem rfs;
+    private FileSystem rfs;
 
     // Counters
-    final Counters.Counter mapOutputByteCounter;
-    final Counters.Counter mapOutputRecordCounter;
-    final Counters.Counter fileOutputByteCounter;
+    private Counters.Counter mapOutputByteCounter;
+    private Counters.Counter mapOutputRecordCounter;
+    private Counters.Counter fileOutputByteCounter;
 
     final ArrayList<SpillRecord> indexCacheList =
       new ArrayList<SpillRecord>();
@@ -885,12 +908,23 @@ class MapTask extends Task {
     private int indexCacheMemoryLimit;
     private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
 
+    private MapTask mapTask;
+    private MapOutputFile mapOutputFile;
+    private Progress sortPhase;
+    private Counters.Counter spilledRecordsCounter;
+
+    public MapOutputBuffer() {
+    }
+
     @SuppressWarnings("unchecked")
-    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
-                           TaskReporter reporter
-                           ) throws IOException, ClassNotFoundException {
-      this.job = job;
-      this.reporter = reporter;
+    public void init(MapOutputCollector.Context context
+                    ) throws IOException, ClassNotFoundException {
+      job = context.getJobConf();
+      reporter = context.getReporter();
+      mapTask = context.getMapTask();
+      mapOutputFile = mapTask.getMapOutputFile();
+      sortPhase = mapTask.getSortPhase();
+      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
       partitions = job.getNumReduceTasks();
       rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
 
@@ -967,7 +1001,7 @@ class MapTask extends Task {
       if (combinerRunner != null) {
         final Counters.Counter combineOutputCounter =
           reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
-        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
+        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
       } else {
         combineCollector = null;
       }
@@ -1118,6 +1152,10 @@ class MapTask extends Task {
       }
     }
 
+    private TaskAttemptID getTaskID() {
+      return mapTask.getTaskID();
+    }
+
     /**
      * Set the point from which meta and serialization data expand. The meta
      * indices are aligned with the buffer, so metadata never spans the ends of
@@ -1490,7 +1528,7 @@ class MapTask extends Task {
         if (lspillException instanceof Error) {
           final String logMsg = "Task " + getTaskID() + " failed : " +
             StringUtils.stringifyException(lspillException);
-          reportFatalError(getTaskID(), lspillException, logMsg);
+          mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
         }
         throw new IOException("Spill failed", lspillException);
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Sun Dec 16 23:27:26 2012
@@ -26,6 +26,8 @@ import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.Checksum;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,7 +36,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.PureJavaCrc32;
 
-class SpillRecord {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class SpillRecord {
 
   /** Backing store */
   private final ByteBuffer buf;
@@ -143,17 +147,3 @@ class SpillRecord {
   }
 
 }
-
-class IndexRecord {
-  long startOffset;
-  long rawLength;
-  long partLength;
-
-  public IndexRecord() { }
-
-  public IndexRecord(long startOffset, long rawLength, long partLength) {
-    this.startOffset = startOffset;
-    this.rawLength = rawLength;
-    this.partLength = partLength;
-  }
-}

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sun Dec 16 23:27:26 2012
@@ -584,9 +584,9 @@ abstract public class Task implements Wr
     return status;
   }
 
-  @InterfaceAudience.Private
+  @InterfaceAudience.LimitedPrivate({"MapReduce"})
   @InterfaceStability.Unstable
-  protected class TaskReporter 
+  public class TaskReporter 
       extends org.apache.hadoop.mapreduce.StatusReporter
       implements Runnable, Reporter {
     private TaskUmbilicalProtocol umbilical;
@@ -1466,9 +1466,9 @@ abstract public class Task implements Wr
     return reducerContext;
   }
 
-  @InterfaceAudience.Private
+  @InterfaceAudience.LimitedPrivate({"MapReduce"})
   @InterfaceStability.Unstable
-  protected static abstract class CombinerRunner<K,V> {
+  public static abstract class CombinerRunner<K,V> {
     protected final Counters.Counter inputCounter;
     protected final JobConf job;
     protected final TaskReporter reporter;
@@ -1486,13 +1486,13 @@ abstract public class Task implements Wr
      * @param iterator the key/value pairs to use as input
      * @param collector the output collector
      */
-    abstract void combine(RawKeyValueIterator iterator, 
+    public abstract void combine(RawKeyValueIterator iterator, 
                           OutputCollector<K,V> collector
                          ) throws IOException, InterruptedException, 
                                   ClassNotFoundException;
 
     @SuppressWarnings("unchecked")
-    static <K,V> 
+    public static <K,V> 
     CombinerRunner<K,V> create(JobConf job,
                                TaskAttemptID taskId,
                                Counters.Counter inputCounter,
@@ -1542,7 +1542,7 @@ abstract public class Task implements Wr
     }
 
     @SuppressWarnings("unchecked")
-    protected void combine(RawKeyValueIterator kvIter,
+    public void combine(RawKeyValueIterator kvIter,
                            OutputCollector<K,V> combineCollector
                            ) throws IOException {
       Reducer<K,V,K,V> combiner = 
@@ -1611,7 +1611,7 @@ abstract public class Task implements Wr
 
     @SuppressWarnings("unchecked")
     @Override
-    void combine(RawKeyValueIterator iterator, 
+    public void combine(RawKeyValueIterator iterator, 
                  OutputCollector<K,V> collector
                  ) throws IOException, InterruptedException,
                           ClassNotFoundException {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sun Dec 16 23:27:26 2012
@@ -30,6 +30,9 @@ public interface MRJobConfig {
 
   public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
 
+  public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
+                                  = "mapreduce.job.map.output.collector.class";
+
   public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
 
   public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java Sun Dec 16 23:27:26 2012
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * An interface for reporting exceptions to other threads
  */
-interface ExceptionReporter {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public interface ExceptionReporter {
   void reportException(Throwable t);
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java Sun Dec 16 23:27:26 2012
@@ -20,9 +20,14 @@ package org.apache.hadoop.mapreduce.task
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-class MapHost {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapHost {
   
   public static enum State {
     IDLE,               // No map outputs available

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Sun Dec 16 23:27:26 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -33,7 +35,9 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-class MapOutput<K,V> {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapOutput<K,V> {
   private static final Log LOG = LogFactory.getLog(MapOutput.class);
   private static AtomicInteger ID = new AtomicInteger(0);
   
@@ -62,7 +66,7 @@ class MapOutput<K,V> {
   
   private final boolean primaryMapOutput;
   
-  MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, 
+  public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, 
             JobConf conf, LocalDirAllocator localDirAllocator,
             int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
          throws IOException {
@@ -87,7 +91,7 @@ class MapOutput<K,V> {
     this.primaryMapOutput = primaryMapOutput;
   }
   
-  MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, 
+  public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, 
             boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
     this.mapId = mapId;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Sun Dec 16 23:27:26 2012
@@ -59,7 +59,7 @@ import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 
 @SuppressWarnings(value={"unchecked", "deprecation"})
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
 public class MergeManager<K, V> {
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Sun Dec 16 23:27:26 2012
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 
-@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Sun Dec 16 23:27:26 2012
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -25,7 +28,9 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 
-class ShuffleClientMetrics implements Updater {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class ShuffleClientMetrics implements Updater {
 
   private MetricsRecord shuffleMetrics = null;
   private int numFailedFetches = 0;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sun Dec 16 23:27:26 2012
@@ -938,4 +938,12 @@
   <value>jhs/_HOST@REALM.TLD</value>
 </property>
 
+<property>
+  <name>mapreduce.job.map.output.collector.class</name>
+  <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+  <description>
+    It defines the MapOutputCollector implementation to use.
+  </description>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1422281-1422713