You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/07/25 01:11:17 UTC

svn commit: r1506774 - in /hadoop/common/branches/branch-1: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapred/

Author: tucu
Date: Wed Jul 24 23:11:17 2013
New Revision: 1506774

URL: http://svn.apache.org/r1506774
Log:
MAPREDUCE-2454. plugin for generic shuffle service. (avnerb via tucu)

Added:
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Jul 24 23:11:17 2013
@@ -6,6 +6,8 @@ Release 1.3.0 - unreleased
 
   NEW FEATURES
 
+    MAPREDUCE-2454. plugin for generic shuffle service. (avnerb via tucu)
+
   IMPROVEMENTS
 
     HADOOP-9450. HADOOP_USER_CLASSPATH_FIRST is not honored; CLASSPATH

Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Wed Jul 24 23:11:17 2013
@@ -221,6 +221,24 @@
 </property>
 
 <property>
+  <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
+  <value>org.apache.hadoop.mapred.ReduceTask$ReduceCopier</value>
+  <description>Name of the class whose instance will be used
+   to send shuffle requests by reducetasks of this job.
+   The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.shuffle.provider.plugin.classes</name>
+  <value>org.apache.hadoop.mapred.TaskTracker$DefaultShuffleProvider</value>
+  <description>A comma-separated list of classes that should be loaded as ShuffleProviderPlugin(s).
+   A ShuffleProviderPlugin can serve shuffle requests from reducetasks.
+   Each class in the list must be an instance of org.apache.hadoop.mapred.ShuffleProviderPlugin.
+  </description>
+</property>
+
+<property>
   <name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name>
   <value>5000</value>
   <description>The interval, in milliseconds, for which the tasktracker waits

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Jul 24 23:11:17 2013
@@ -77,6 +77,7 @@ import org.apache.hadoop.mapred.Merger.S
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.metrics2.MetricsBuilder;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -94,7 +95,7 @@ import org.apache.hadoop.metrics2.lib.Me
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 
 /** A Reduce task. */
-class ReduceTask extends Task {
+public class ReduceTask extends Task {
 
   static {                                        // register a ctor
     WritableFactories.setFactory
@@ -106,7 +107,6 @@ class ReduceTask extends Task {
   
   private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
-  private ReduceCopier reduceCopier;
 
   private CompressionCodec codec;
 
@@ -379,16 +379,28 @@ class ReduceTask extends Task {
     
     // Initialize the codec
     codec = initCodec();
+    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
 
     boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
     if (!isLocal) {
-      reduceCopier = new ReduceCopier(umbilical, job, reporter);
-      if (!reduceCopier.fetchOutputs()) {
-        if(reduceCopier.mergeThrowable instanceof FSError) {
-          throw (FSError)reduceCopier.mergeThrowable;
+      // loads ShuffleConsumerPlugin according to configuration file
+      // +++ NOTE: This code support load of 3rd party plugins at runtime +++
+      //
+      Class<? extends ShuffleConsumerPlugin> clazz =
+               job.getClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, ReduceCopier.class, ShuffleConsumerPlugin.class);
+
+      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+      LOG.info(" Using ShuffleConsumerPlugin : " + shuffleConsumerPlugin);
+
+      ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(ReduceTask.this, umbilical, conf, reporter);
+      shuffleConsumerPlugin.init(context);
+
+      if (!shuffleConsumerPlugin.fetchOutputs()) {
+        if(shuffleConsumerPlugin.getMergeThrowable() instanceof FSError) {
+          throw (FSError)shuffleConsumerPlugin.getMergeThrowable();
         }
         throw new IOException("Task: " + getTaskID() + 
-            " - The reduce copier failed", reduceCopier.mergeThrowable);
+            " - The ShuffleConsumerPlugin " + clazz.getSimpleName() + " failed", shuffleConsumerPlugin.getMergeThrowable());
       }
     }
     copyPhase.complete();                         // copy is already complete
@@ -402,7 +414,7 @@ class ReduceTask extends Task {
           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
           reporter, spilledRecordsCounter, null)
-      : reduceCopier.createKVIterator(job, rfs, reporter);
+      : shuffleConsumerPlugin.createKVIterator(job, rfs, reporter);
         
     // free up the data structures
     mapOutputFilesOnDisk.clear();
@@ -421,6 +433,9 @@ class ReduceTask extends Task {
       runOldReducer(job, umbilical, reporter, rIter, comparator, 
                     keyClass, valueClass);
     }
+    if (shuffleConsumerPlugin != null) {
+      shuffleConsumerPlugin.close();
+    }
     done(umbilical, reporter);
   }
 
@@ -658,11 +673,11 @@ class ReduceTask extends Task {
     OTHER_ERROR
   };
 
-  class ReduceCopier<K, V> implements MRConstants {
+  public static class ReduceCopier<K, V> implements ShuffleConsumerPlugin, MRConstants {
 
     /** Reference to the umbilical object */
     private TaskUmbilicalProtocol umbilical;
-    private final TaskReporter reporter;
+    private TaskReporter reporter;
     
     /** Reference to the task object */
     
@@ -749,18 +764,18 @@ class ReduceTask extends Task {
     /**
      * When we accumulate maxInMemOutputs number of files in ram, we merge/spill
      */
-    private final int maxInMemOutputs;
+    private int maxInMemOutputs;
 
     /**
      * Usage threshold for in-memory output accumulation.
      */
-    private final float maxInMemCopyPer;
+    private float maxInMemCopyPer;
 
     /**
      * Maximum memory usage of map outputs to merge from memory into
      * the reduce, in bytes.
      */
-    private final long maxInMemReduce;
+    private long maxInMemReduce;
 
     /**
      * The threads for fetching the files.
@@ -810,7 +825,7 @@ class ReduceTask extends Task {
     /**
      * Maximum number of fetch failures before reducer aborts.
      */
-    private final int abortFailureLimit;
+    private int abortFailureLimit;
 
     /**
      * Initial penalty time in ms for a fetch failure.
@@ -918,8 +933,8 @@ class ReduceTask extends Task {
       ShuffleClientInstrumentation(JobConf conf) {
         registry.tag("user", "User name", conf.getUser())
                 .tag("jobName", "Job name", conf.getJobName())
-                .tag("jobId", "Job ID", ReduceTask.this.getJobID().toString())
-                .tag("taskId", "Task ID", getTaskID().toString())
+                .tag("jobId", "Job ID", reduceTask.getJobID().toString())
+                .tag("taskId", "Task ID", reduceTask.getTaskID().toString())
                 .tag("sessionId", "Session ID", conf.getSessionId());
       }
 
@@ -960,7 +975,7 @@ class ReduceTask extends Task {
 
     private ShuffleClientInstrumentation createShuffleClientInstrumentation() {
       return DefaultMetricsSystem.INSTANCE.register("ShuffleClientMetrics",
-          "Shuffle input metrics", new ShuffleClientInstrumentation(conf));
+          "Shuffle input metrics", new ShuffleClientInstrumentation(reduceTask.conf));
     }
 
     /** Represents the result of an attempt to copy a map output */
@@ -1353,15 +1368,15 @@ class ReduceTask extends Task {
             LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + 
                       StringUtils.stringifyException(e));
             try {
-              umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
+              umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), reduceTask.jvmContext);
             } catch (IOException io) {
               LOG.error("Could not notify TT of FSError: " + 
                       StringUtils.stringifyException(io));
             }
           } catch (Throwable th) {
-            String msg = getTaskID() + " : Map output copy failure : " 
+            String msg = reduceTask.getTaskID() + " : Map output copy failure : " 
                          + StringUtils.stringifyException(th);
-            reportFatalError(getTaskID(), th, msg);
+            reduceTask.reportFatalError(reduceTask.getTaskID(), th, msg);
           }
         }
         
@@ -1410,7 +1425,7 @@ class ReduceTask extends Task {
         long bytes = mapOutput.compressedSize;
         
         // lock the ReduceTask while we do the rename
-        synchronized (ReduceTask.this) {
+        synchronized (reduceTask) {
           if (copiedMapOutputs.contains(loc.getTaskId())) {
             mapOutput.discard();
             return CopyResult.OBSOLETE;
@@ -1446,7 +1461,7 @@ class ReduceTask extends Task {
                   tmpMapOutput + " to " + filename);
             }
 
-            synchronized (mapOutputFilesOnDisk) {        
+            synchronized (reduceTask.mapOutputFilesOnDisk) {        
               FileStatus fileStatus = localFileSys.getFileStatus(filename);
               CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
                   fileStatus, mapOutput.decompressedSize);
@@ -1469,7 +1484,7 @@ class ReduceTask extends Task {
        */
       private void noteCopiedMapOutput(TaskID taskId) {
         copiedMapOutputs.add(taskId);
-        ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
+        ramManager.setNumCopiedMapOutputs(reduceTask.numMaps - copiedMapOutputs.size());
       }
 
       /**
@@ -1689,7 +1704,7 @@ class ReduceTask extends Task {
         }
 
         IFileInputStream checksumIn = 
-          new IFileInputStream(input,compressedLength, conf);
+          new IFileInputStream(input,compressedLength, reduceTask.conf);
 
         input = checksumIn;       
       
@@ -1798,12 +1813,12 @@ class ReduceTask extends Task {
       throws IOException {
         // Find out a suitable location for the output on local-filesystem
         Path localFilename = 
-          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
-                                         mapOutputLength, conf);
+          reduceTask.lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
+                                         mapOutputLength, reduceTask.conf);
 
         MapOutput mapOutput = 
           new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
-                        conf, localFileSys.makeQualified(localFilename), 
+                        reduceTask.conf, localFileSys.makeQualified(localFilename), 
                         mapOutputLength);
 
 
@@ -1870,9 +1885,9 @@ class ReduceTask extends Task {
             LOG.info("Failed to discard map-output from " + 
                 mapOutputLoc.getTaskAttemptId(), ioe);
           } catch (Throwable t) {
-            String msg = getTaskID() + " : Failed in shuffle to disk :" 
+            String msg = reduceTask.getTaskID() + " : Failed in shuffle to disk :" 
                          + StringUtils.stringifyException(t);
-            reportFatalError(getTaskID(), t, msg);
+            reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
           }
           mapOutput = null;
 
@@ -1894,7 +1909,7 @@ class ReduceTask extends Task {
       throws IOException {
       
       // get the task and the current classloader which will become the parent
-      Task task = ReduceTask.this;
+      Task task = reduceTask;
       ClassLoader parent = conf.getClassLoader();   
       
       // get the work directory which holds the elements we are dynamically
@@ -1925,16 +1940,16 @@ class ReduceTask extends Task {
       URLClassLoader loader = new URLClassLoader(urls, parent);
       conf.setClassLoader(loader);
     }
-    
-    public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
-                        TaskReporter reporter
-                        )throws ClassNotFoundException, IOException {
+
+    @Override
+    public void init (ShuffleConsumerPlugin.Context context)throws ClassNotFoundException, IOException {
       
+      JobConf conf = context.getConf();
+      this.reporter = context.getReporter();
+      this.umbilical = context.getUmbilical();
+      this.reduceTask = context.getReduceTask();
       configureClasspath(conf);
-      this.reporter = reporter;
       this.shuffleClientMetrics = createShuffleClientInstrumentation();
-      this.umbilical = umbilical;      
-      this.reduceTask = ReduceTask.this;
 
       this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
       this.copyResults = new ArrayList<CopyResult>(100);    
@@ -1942,22 +1957,22 @@ class ReduceTask extends Task {
       this.maxInFlight = 4 * numCopiers;
       Counters.Counter combineInputCounter = 
         reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
-      this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
+      this.combinerRunner = CombinerRunner.create(conf, reduceTask.getTaskID(),
                                                   combineInputCounter,
                                                   reporter, null);
       if (combinerRunner != null) {
         combineCollector = 
-          new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf);
+          new CombineOutputCollector(reduceTask.reduceCombineOutputCounter, reporter, conf);
       }
       
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
       
-      this.abortFailureLimit = Math.max(30, numMaps / 10);
+      this.abortFailureLimit = Math.max(30, reduceTask.numMaps / 10);
 
       this.maxFetchFailuresBeforeReporting = conf.getInt(
           "mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
 
-      this.maxFailedUniqueFetches = Math.min(numMaps, 
+      this.maxFailedUniqueFetches = Math.min(reduceTask.numMaps, 
                                              this.maxFailedUniqueFetches);
       this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
       this.maxInMemCopyPer =
@@ -1994,12 +2009,22 @@ class ReduceTask extends Task {
       this.reportReadErrorImmediately = 
         conf.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
     }
-    
+
+    @Override
+    public Throwable getMergeThrowable() {
+      return mergeThrowable;
+    }
+
+    @Override
+    public void close(){
+    }
+
     private boolean busyEnough(int numInFlight) {
       return numInFlight > maxInFlight;
     }
     
     
+    @Override
     public boolean fetchOutputs() throws IOException {
       int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
@@ -2010,7 +2035,7 @@ class ReduceTask extends Task {
       InMemFSMergeThread inMemFSMergeThread = null;
       GetMapEventsThread getMapEventsThread = null;
       
-      for (int i = 0; i < numMaps; i++) {
+      for (int i = 0; i < reduceTask.numMaps; i++) {
         copyPhase.addPhase();       // add sub-phase per file
       }
       
@@ -2018,7 +2043,7 @@ class ReduceTask extends Task {
       
       // start all the copying threads
       for (int i=0; i < numCopiers; i++) {
-        MapOutputCopier copier = new MapOutputCopier(conf, reporter, 
+        MapOutputCopier copier = new MapOutputCopier(reduceTask.conf, reporter, 
             reduceTask.getJobTokenSecret());
         copiers.add(copier);
         copier.start();
@@ -2042,7 +2067,7 @@ class ReduceTask extends Task {
       long lastOutputTime = 0;
       
         // loop until we get all required outputs
-        while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
+        while (copiedMapOutputs.size() < reduceTask.numMaps && mergeThrowable == null) {
           int numEventsAtStartOfScheduling;
           synchronized (copyResultsOrNewEventsLock) {
             numEventsAtStartOfScheduling = numEventsFetched;
@@ -2056,7 +2081,7 @@ class ReduceTask extends Task {
           }
           if (logNow) {
             LOG.info(reduceTask.getTaskID() + " Need another " 
-                   + (numMaps - copiedMapOutputs.size()) + " map output(s) "
+                   + (reduceTask.numMaps - copiedMapOutputs.size()) + " map output(s) "
                    + "where " + numInFlight + " is already in progress");
           }
 
@@ -2215,15 +2240,15 @@ class ReduceTask extends Task {
             if (cr.getSuccess()) {  // a successful copy
               numCopied++;
               lastProgressTime = System.currentTimeMillis();
-              reduceShuffleBytes.increment(cr.getSize());
+              reduceTask.reduceShuffleBytes.increment(cr.getSize());
                 
               long secsSinceStart = 
                 (System.currentTimeMillis()-startTime)/1000+1;
-              float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
+              float mbs = ((float)reduceTask.reduceShuffleBytes.getCounter())/(1024*1024);
               float transferRate = mbs/secsSinceStart;
                 
               copyPhase.startNextPhase();
-              copyPhase.setStatus("copy (" + numCopied + " of " + numMaps 
+              copyPhase.setStatus("copy (" + numCopied + " of " + reduceTask.numMaps 
                                   + " at " +
                                   mbpsFormat.format(transferRate) +  " MB/s)");
                 
@@ -2249,15 +2274,15 @@ class ReduceTask extends Task {
               noFailedFetches = 
                 (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
               mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
-              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
+              LOG.info("Task " + reduceTask.getTaskID() + ": Failed fetch #" + 
                        noFailedFetches + " from " + mapTaskId);
 
               if (noFailedFetches >= abortFailureLimit) {
                 LOG.fatal(noFailedFetches + " failures downloading "
-                          + getTaskID() + ".");
-                umbilical.shuffleError(getTaskID(),
+                          + reduceTask.getTaskID() + ".");
+                umbilical.shuffleError(reduceTask.getTaskID(),
                                  "Exceeded the abort failure limit;"
-                                 + " bailing-out.", jvmContext);
+                                 + " bailing-out.", reduceTask.jvmContext);
               }
               
               checkAndInformJobTracker(noFailedFetches, mapTaskId,
@@ -2279,7 +2304,7 @@ class ReduceTask extends Task {
                 
                 // check if the reducer has progressed enough
                 boolean reducerProgressedEnough = 
-                    (((float)numCopied / numMaps) 
+                    (((float)numCopied / reduceTask.numMaps) 
                      >= MIN_REQUIRED_PROGRESS_PERCENT);
                 
                 // check if the reducer is stalled for a long time
@@ -2300,15 +2325,15 @@ class ReduceTask extends Task {
                 
                 // kill if not healthy and has insufficient progress
                 if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
-                     fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
+                     fetchFailedMaps.size() == (reduceTask.numMaps - copiedMapOutputs.size()))
                     && !reducerHealthy 
                     && (!reducerProgressedEnough || reducerStalled)) { 
                   LOG.fatal("Shuffle failed with too many fetch failures " + 
                             "and insufficient progress!" +
-                            "Killing task " + getTaskID() + ".");
-                  umbilical.shuffleError(getTaskID(), 
+                            "Killing task " + reduceTask.getTaskID() + ".");
+                  umbilical.shuffleError(reduceTask.getTaskID(), 
                                          "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
-                                         + " bailing-out.", jvmContext);
+                                         + " bailing-out.", reduceTask.jvmContext);
                 }
 
               }
@@ -2347,9 +2372,9 @@ class ReduceTask extends Task {
         }
         
         // copiers are done, exit and notify the waiting merge threads
-        synchronized (mapOutputFilesOnDisk) {
+        synchronized (reduceTask.mapOutputFilesOnDisk) {
           exitLocalFSMerge = true;
-          mapOutputFilesOnDisk.notify();
+          reduceTask.mapOutputFilesOnDisk.notify();
         }
         
         ramManager.close();
@@ -2360,7 +2385,7 @@ class ReduceTask extends Task {
             // Wait for the on-disk merge to complete
             localFSMergerThread.join();
             LOG.info("Interleaved on-disk merge complete: " + 
-                     mapOutputFilesOnDisk.size() + " files left.");
+                     reduceTask.mapOutputFilesOnDisk.size() + " files left.");
             
             //wait for an ongoing merge (if it is in flight) to complete
             inMemFSMergeThread.join();
@@ -2377,7 +2402,7 @@ class ReduceTask extends Task {
             return false;
           }
         }
-        return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
+        return mergeThrowable == null && copiedMapOutputs.size() == reduceTask.numMaps;
     }
     
     // Notify the JobTracker
@@ -2387,8 +2412,8 @@ class ReduceTask extends Task {
         int failures, TaskAttemptID mapId, boolean readError) {
       if ((reportReadErrorImmediately && readError)
           || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-        synchronized (ReduceTask.this) {
-          taskStatus.addFetchFailedMap(mapId);
+        synchronized (reduceTask) {
+          reduceTask.taskStatus.addFetchFailedMap(mapId);
           reporter.progress();
           LOG.info("Failed to fetch map-output from " + mapId +
                    " even after MAX_FETCH_RETRIES_PER_MAP retries... "
@@ -2442,15 +2467,16 @@ class ReduceTask extends Task {
      * first merge pass. If not, then said outputs must be written to disk
      * first.
      */
+    @Override
     @SuppressWarnings("unchecked")
-    private RawKeyValueIterator createKVIterator(
+    public RawKeyValueIterator createKVIterator(
         JobConf job, FileSystem fs, Reporter reporter) throws IOException {
 
       // merge config params
       Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
       Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
       boolean keepInputs = job.getKeepFailedTaskFiles();
-      final Path tmpDir = new Path(getTaskID().toString());
+      final Path tmpDir = new Path(reduceTask.getTaskID().toString());
       final RawComparator<K> comparator =
         (RawComparator<K>)job.getOutputKeyComparator();
 
@@ -2463,15 +2489,15 @@ class ReduceTask extends Task {
             maxInMemReduce);
         final int numMemDiskSegments = memDiskSegments.size();
         if (numMemDiskSegments > 0 &&
-              ioSortFactor > mapOutputFilesOnDisk.size()) {
+              ioSortFactor > reduceTask.mapOutputFilesOnDisk.size()) {
           // must spill to disk, but can't retain in-mem for intermediate merge
           final Path outputPath =
-              mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
+              reduceTask.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-              tmpDir, comparator, reporter, spilledRecordsCounter, null);
+              tmpDir, comparator, reporter, reduceTask.spilledRecordsCounter, null);
           Writer writer = new Writer(job, fs, outputPath,
-              keyClass, valueClass, codec, null);
+              keyClass, valueClass, reduceTask.codec, null);
           try {
             Merger.writeFile(rIter, writer, reporter, job);
             writer.close();
@@ -2508,15 +2534,15 @@ class ReduceTask extends Task {
       long onDiskBytes = inMemToDiskBytes;
       long totalDecompressedBytes = inMemToDiskBytes;
 
-      for (CompressAwareFileStatus filestatus : mapOutputFilesOnDisk) {
+      for (CompressAwareFileStatus filestatus : reduceTask.mapOutputFilesOnDisk) {
         long len = filestatus.getLen();
         onDiskBytes += len;
         diskSegments.add(new Segment<K, V>(job, fs, filestatus.getPath(),
-            codec, keepInputs, filestatus.getDecompressedSize()));
+            reduceTask.codec, keepInputs, filestatus.getDecompressedSize()));
         totalDecompressedBytes += (filestatus.getDecompressedSize() > 0) ? filestatus
             .getDecompressedSize() : len;
       }
-      LOG.info("Merging " + mapOutputFilesOnDisk.size() + " files, " +
+      LOG.info("Merging " + reduceTask.mapOutputFilesOnDisk.size() + " files, " +
                onDiskBytes + " bytes from disk");
       Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
         public int compare(Segment<K, V> o1, Segment<K, V> o2) {
@@ -2537,9 +2563,9 @@ class ReduceTask extends Task {
         diskSegments.addAll(0, memDiskSegments);
         memDiskSegments.clear();
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, codec, diskSegments,
+            job, fs, keyClass, valueClass, reduceTask.codec, diskSegments,
             ioSortFactor, numInMemSegments, tmpDir, comparator,
-            reporter, false, spilledRecordsCounter, null);
+            reporter, false, reduceTask.spilledRecordsCounter, null);
         diskSegments.clear();
         if (0 == finalSegments.size()) {
           return diskMerge;
@@ -2549,7 +2575,7 @@ class ReduceTask extends Task {
       }
       return Merger.merge(job, fs, keyClass, valueClass,
                    finalSegments, finalSegments.size(), tmpDir,
-                   comparator, reporter, spilledRecordsCounter, null);
+                   comparator, reporter, reduceTask.spilledRecordsCounter, null);
     }
 
     class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2558,7 +2584,7 @@ class ReduceTask extends Task {
 
       public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
           throws IOException {
-        super(null, null, size, null, spilledRecordsCounter);
+        super(null, null, size, null, reduceTask.spilledRecordsCounter);
         this.kvIter = kvIter;
       }
 
@@ -2621,9 +2647,9 @@ class ReduceTask extends Task {
     }
     
     private void addToMapOutputFilesOnDisk(CompressAwareFileStatus status) {
-      synchronized (mapOutputFilesOnDisk) {
-        mapOutputFilesOnDisk.add(status);
-        mapOutputFilesOnDisk.notify();
+      synchronized (reduceTask.mapOutputFilesOnDisk) {
+        reduceTask.mapOutputFilesOnDisk.add(status);
+        reduceTask.mapOutputFilesOnDisk.notify();
       }
     }
     
@@ -2647,11 +2673,11 @@ class ReduceTask extends Task {
         try {
           LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
           while(!exitLocalFSMerge){
-            synchronized (mapOutputFilesOnDisk) {
+            synchronized (reduceTask.mapOutputFilesOnDisk) {
               while (!exitLocalFSMerge &&
-                  mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
+                  reduceTask.mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
                 LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
-                mapOutputFilesOnDisk.wait();
+                reduceTask.mapOutputFilesOnDisk.wait();
               }
             }
             if(exitLocalFSMerge) {//to avoid running one extra time in the end
@@ -2662,15 +2688,15 @@ class ReduceTask extends Task {
             int bytesPerSum = 
               reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
             LOG.info(reduceTask.getTaskID() + "We have  " + 
-                mapOutputFilesOnDisk.size() + " map outputs on disk. " +
+                reduceTask.mapOutputFilesOnDisk.size() + " map outputs on disk. " +
                 "Triggering merge of " + ioSortFactor + " files");
             // 1. Prepare the list of files to be merged. This list is prepared
             // using a list of map output files on disk. Currently we merge
             // io.sort.factor files into 1.
-            synchronized (mapOutputFilesOnDisk) {
+            synchronized (reduceTask.mapOutputFilesOnDisk) {
               for (int i = 0; i < ioSortFactor; ++i) {
-                FileStatus filestatus = mapOutputFilesOnDisk.first();
-                mapOutputFilesOnDisk.remove(filestatus);
+                FileStatus filestatus = reduceTask.mapOutputFilesOnDisk.first();
+                reduceTask.mapOutputFilesOnDisk.remove(filestatus);
                 mapFiles.add(filestatus.getPath());
                 approxOutputSize += filestatus.getLen();
               }
@@ -2688,27 +2714,27 @@ class ReduceTask extends Task {
   
             // 2. Start the on-disk merge process
             Path outputPath = 
-              lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), 
-                                             approxOutputSize, conf)
-              .suffix(".merged");
-            Writer writer = 
-              new Writer(conf,rfs, outputPath, 
-                         conf.getMapOutputKeyClass(), 
-                         conf.getMapOutputValueClass(),
-                         codec, null);
-            RawKeyValueIterator iter  = null;
-            Path tmpDir = new Path(reduceTask.getTaskID().toString());
-            long decompressedBytesWritten;
-            try {
-              iter = Merger.merge(conf, rfs,
-                                  conf.getMapOutputKeyClass(),
-                                  conf.getMapOutputValueClass(),
-                                  codec, mapFiles.toArray(new Path[mapFiles.size()]), 
-                                  true, ioSortFactor, tmpDir, 
-                                  conf.getOutputKeyComparator(), reporter,
-                                  spilledRecordsCounter, null);
-              
-              Merger.writeFile(iter, writer, reporter, conf);
+              reduceTask.lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
+                                             approxOutputSize, reduceTask.conf)              
+              .suffix(".merged");              
+            Writer writer =              
+              new Writer(reduceTask.conf,rfs, outputPath,              
+                         reduceTask.conf.getMapOutputKeyClass(),              
+                         reduceTask.conf.getMapOutputValueClass(),              
+                         reduceTask.codec, null);              
+            RawKeyValueIterator iter  = null;              
+            Path tmpDir = new Path(reduceTask.getTaskID().toString());              
+            long decompressedBytesWritten;              
+            try {              
+              iter = Merger.merge(reduceTask.conf, rfs,              
+                                  reduceTask.conf.getMapOutputKeyClass(),              
+                                  reduceTask.conf.getMapOutputValueClass(),              
+                                  reduceTask.codec, mapFiles.toArray(new Path[mapFiles.size()]),              
+                                  true, ioSortFactor, tmpDir,              
+                                  reduceTask.conf.getOutputKeyComparator(), reporter,              
+                                  reduceTask.spilledRecordsCounter, null);              
+            
+              Merger.writeFile(iter, writer, reporter, reduceTask.conf);              
               writer.close();
               decompressedBytesWritten = writer.decompressedBytesWritten;
             } catch (Exception e) {
@@ -2716,7 +2742,7 @@ class ReduceTask extends Task {
               throw new IOException (StringUtils.stringifyException(e));
             }
             
-            synchronized (mapOutputFilesOnDisk) {
+            synchronized (reduceTask.mapOutputFilesOnDisk) {
               FileStatus fileStatus = localFileSys.getFileStatus(outputPath);
               CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
                   fileStatus, decompressedBytesWritten);
@@ -2738,9 +2764,9 @@ class ReduceTask extends Task {
             mergeThrowable = e;
           }
         } catch (Throwable t) {
-          String msg = getTaskID() + " : Failed to merge on the local FS" 
+          String msg = reduceTask.getTaskID() + " : Failed to merge on the local FS" 
                        + StringUtils.stringifyException(t);
-          reportFatalError(getTaskID(), t, msg);
+          reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
         }
       }
     }
@@ -2768,9 +2794,9 @@ class ReduceTask extends Task {
                    + StringUtils.stringifyException(e));
           ReduceCopier.this.mergeThrowable = e;
         } catch (Throwable t) {
-          String msg = getTaskID() + " : Failed to merge in memory" 
+          String msg = reduceTask.getTaskID() + " : Failed to merge in memory" 
                        + StringUtils.stringifyException(t);
-          reportFatalError(getTaskID(), t, msg);
+          reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
         }
       }
       
@@ -2796,29 +2822,29 @@ class ReduceTask extends Task {
         int noInMemorySegments = inMemorySegments.size();
 
         Path outputPath =
-            mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
+            reduceTask.mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
 
         Writer writer = 
-          new Writer(conf, rfs, outputPath,
-                     conf.getMapOutputKeyClass(),
-                     conf.getMapOutputValueClass(),
-                     codec, null);
+          new Writer(reduceTask.conf, rfs, outputPath,
+                     reduceTask.conf.getMapOutputKeyClass(),
+                     reduceTask.conf.getMapOutputValueClass(),
+                     reduceTask.codec, null);
         long decompressedBytesWritten;
         RawKeyValueIterator rIter = null;
         try {
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                    " segments...");
           
-          rIter = Merger.merge(conf, rfs,
-                               (Class<K>)conf.getMapOutputKeyClass(),
-                               (Class<V>)conf.getMapOutputValueClass(),
+          rIter = Merger.merge(reduceTask.conf, rfs,
+                               (Class<K>)reduceTask.conf.getMapOutputKeyClass(),
+                               (Class<V>)reduceTask.conf.getMapOutputValueClass(),
                                inMemorySegments, inMemorySegments.size(),
                                new Path(reduceTask.getTaskID().toString()),
-                               conf.getOutputKeyComparator(), reporter,
-                               spilledRecordsCounter, null);
+                               reduceTask.conf.getOutputKeyComparator(), reporter,
+                               reduceTask.spilledRecordsCounter, null);
           
           if (combinerRunner == null) {
-            Merger.writeFile(rIter, writer, reporter, conf);
+            Merger.writeFile(rIter, writer, reporter, reduceTask.conf);
           } else {
             combineCollector.setWriter(writer);
             combinerRunner.combine(rIter, combineCollector);
@@ -2843,7 +2869,7 @@ class ReduceTask extends Task {
         FileStatus status = localFileSys.getFileStatus(outputPath);
         CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
             status, decompressedBytesWritten);
-        synchronized (mapOutputFilesOnDisk) {
+        synchronized (reduceTask.mapOutputFilesOnDisk) {
           addToMapOutputFilesOnDisk(compressedFileStatus);
         }
       }
@@ -2891,7 +2917,7 @@ class ReduceTask extends Task {
             String msg = reduceTask.getTaskID()
                          + " GetMapEventsThread Ignoring exception : " 
                          + StringUtils.stringifyException(t);
-            reportFatalError(getTaskID(), t, msg);
+            reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
           }
         } while (!exitGetMapEvents);
 
@@ -2912,7 +2938,7 @@ class ReduceTask extends Task {
           umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
                                            fromEventId.get(), 
                                            MAX_EVENTS_TO_FETCH,
-                                           reduceTask.getTaskID(), jvmContext);
+                                           reduceTask.getTaskID(), reduceTask.jvmContext);
         TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
           
         // Check if the reset is required.
@@ -2950,7 +2976,7 @@ class ReduceTask extends Task {
               URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                       "/mapOutput?job=" + taskId.getJobID() +
                                       "&map=" + taskId + 
-                                      "&reduce=" + getPartition());
+                                      "&reduce=" + reduceTask.getPartition());
               List<MapOutputLocation> loc = mapLocations.get(host);
               if (loc == null) {
                 loc = Collections.synchronizedList

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1506774&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Wed Jul 24 23:11:17 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * ShuffleConsumerPlugin for serving Reducers.  It may shuffle MOF files from
+ * either the built-in provider (MapOutputServlet) or from a 3rd party ShuffleProviderPlugin.
+ *
+ */
+@InterfaceAudience.LimitedPrivate("MapReduce")
+@InterfaceStability.Unstable
+public interface ShuffleConsumerPlugin {
+
+  /**
+   * initialize this instance after it was created by factory.
+   */
+  public void init(Context context) throws ClassNotFoundException, IOException;
+
+  /**
+   * fetch output of mappers from TaskTrackers
+   * @return true iff success.  In case of failure an appropriate Throwable may be available thru getMergeThrowable() member
+   */
+  public boolean fetchOutputs() throws IOException;
+
+  /**
+   * @ret reference to a Throwable object (if merge throws an exception)
+   */
+  public Throwable getMergeThrowable();
+
+  /**
+   * Create a RawKeyValueIterator from copied map outputs.
+   *
+   * The iterator returned must satisfy the following constraints:
+   *   1. Fewer than io.sort.factor files may be sources
+   *   2. No more than maxInMemReduce bytes of map outputs may be resident
+   *      in memory when the reduce begins
+   *
+   * If we must perform an intermediate merge to satisfy (1), then we can
+   * keep the excluded outputs from (2) in memory and include them in the
+   * first merge pass. If not, then said outputs must be written to disk
+   * first.
+   */
+  public RawKeyValueIterator createKVIterator(JobConf job, FileSystem fs, Reporter reporter) throws IOException;
+
+  /**
+   * close and clean any resource associated with this object.
+   */
+  public void close();
+
+  @InterfaceAudience.LimitedPrivate("MapReduce")
+  @InterfaceStability.Unstable
+  public static class Context {
+    private final ReduceTask reduceTask;
+    private final TaskUmbilicalProtocol umbilical;
+    private final JobConf conf;
+    private final TaskReporter reporter;
+
+    public Context(ReduceTask reduceTask, TaskUmbilicalProtocol umbilical, JobConf conf, TaskReporter reporter){
+      this.reduceTask = reduceTask;
+      this.umbilical = umbilical;
+      this.conf = conf;
+      this.reporter = reporter;
+    }
+
+    public ReduceTask getReduceTask() {
+      return reduceTask;
+    }
+    public JobConf getConf() {
+      return conf;
+    }
+    public TaskUmbilicalProtocol getUmbilical() {
+      return umbilical;
+    }
+    public TaskReporter getReporter() {
+      return reporter;
+    }
+  }
+}

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java?rev=1506774&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java Wed Jul 24 23:11:17 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface is implemented by objects that are able to answer shuffle requests which are
+ * sent from a matching Shuffle Consumer that lives in context of a ReduceTask object.
+ *
+ * ShuffleProviderPlugin object will be notified on the following events:
+ * initialize, destroy.
+ *
+ * NOTE: This interface is also used when loading 3rd party plugins at runtime
+ *
+ */
+@InterfaceAudience.LimitedPrivate("MapReduce")
+@InterfaceStability.Unstable
+public interface ShuffleProviderPlugin {
+  /**
+   * Do constructor work here.
+   * This method is invoked by the TaskTracker Constructor
+   */
+  public void initialize(TaskTracker taskTracker);
+
+  /**
+   * close and cleanup any resource, including threads and disk space.
+   * This method is invoked by TaskTracker.shutdown
+   */
+  public void destroy();
+}

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Jul 24 23:11:17 2013
@@ -141,6 +141,9 @@ public class TaskTracker implements MRCo
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
+  public static final String SHUFFLE_PROVIDER_PLUGIN_CLASSES = "mapreduce.shuffle.provider.plugin.classes";
+  final private ShuffleProviderPlugin shuffleProviderPlugin = new MultiShuffleProviderPlugin();
+
   static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
   static{
@@ -233,6 +236,52 @@ public class TaskTracker implements MRCo
     }
   }
 
+  public static class DefaultShuffleProvider implements ShuffleProviderPlugin {
+    public void initialize(TaskTracker tt) {
+      tt.server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+    }
+
+    public void destroy() {
+    }
+  }
+
+  private static class MultiShuffleProviderPlugin implements ShuffleProviderPlugin {
+
+    private ShuffleProviderPlugin[] plugins;
+
+    public void initialize(TaskTracker tt) {
+      Configuration conf = tt.getJobConf();
+      Class<?>[] klasses = conf.getClasses(SHUFFLE_PROVIDER_PLUGIN_CLASSES, DefaultShuffleProvider.class);
+
+      plugins = new ShuffleProviderPlugin[klasses.length];
+      for (int i = 0; i < klasses.length; i++) {
+        try{
+          LOG.info(" Loading ShuffleProviderPlugin: " + klasses[i]);
+          plugins[i] =  (ShuffleProviderPlugin)ReflectionUtils.newInstance(klasses[i], conf);
+          plugins[i].initialize(tt);
+        }
+        catch(Throwable t) {
+          LOG.warn("Exception instantiating/initializing a ShuffleProviderPlugin: " + klasses[i], t);
+          plugins[i] =  null;
+        }
+      }
+    }
+
+    public void destroy() {
+      if (plugins != null) {
+          for (ShuffleProviderPlugin plugin : plugins) {
+            try {
+              if (plugin != null) {
+                plugin.destroy();
+              }
+            } catch (Throwable t) {
+              LOG.warn("Exception destroying a ShuffleProviderPlugin: " + plugin, t);
+            }
+          }
+        }
+      }
+    }
+
   private LocalStorage localStorage;
   private long lastCheckDirsTime;
   private int lastNumFailures;
@@ -697,7 +746,7 @@ public class TaskTracker implements MRCo
     + TaskTracker.LOCAL_SPLIT_FILE;
   }
 
-  static String getIntermediateOutputDir(String user, String jobid,
+  public static String getIntermediateOutputDir(String user, String jobid,
       String taskid) {
     return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
     + TaskTracker.OUTPUT;
@@ -1433,6 +1482,14 @@ public class TaskTracker implements MRCo
   public synchronized void shutdown() throws IOException, InterruptedException {
     shuttingDown = true;
     close();
+    if (this.shuffleProviderPlugin != null) {
+      try {
+        LOG.info("Shutting down shuffleProviderPlugin");
+        this.shuffleProviderPlugin.destroy();
+      } catch (Exception e) {
+        LOG.warn("Exception shutting down shuffleProviderPlugin", e);
+      }
+    }
     if (this.server != null) {
       try {
         LOG.info("Shutting down StatusHttpServer");
@@ -1611,7 +1668,7 @@ public class TaskTracker implements MRCo
 
     server.setAttribute("shuffleExceptionTracking", shuffleExceptionTracking);
 
-    server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+    shuffleProviderPlugin.initialize(this);
     server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
     this.httpPort = server.getPort();
@@ -3900,9 +3957,22 @@ public class TaskTracker implements MRCo
   }
   
   /**
+   * Get the specific job conf for a running job.
+   */
+  public JobConf getJobConf(JobID jobId) throws IOException {
+    synchronized (runningJobs) {
+      RunningJob rjob = runningJobs.get(jobId);
+      if (rjob == null) {
+        throw new IOException("Unknown job " + jobId + "!!");
+      }
+      return rjob.getJobConf();
+    }
+  }
+
+  /**
    * Get the default job conf for this tracker.
    */
-  JobConf getJobConf() {
+  public JobConf getJobConf() {
     return fConf;
   }
 
@@ -4038,16 +4108,10 @@ public class TaskTracker implements MRCo
         FileSystem rfs = ((LocalFileSystem)
             context.getAttribute("local.file.system")).getRaw();
 
-      String userName = null;
-      String runAsUserName = null;
-      synchronized (tracker.runningJobs) {
-        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
-        if (rjob == null) {
-          throw new IOException("Unknown job " + jobId + "!!");
-        }
-        userName = rjob.jobConf.getUser();
-        runAsUserName = tracker.getTaskController().getRunAsUser(rjob.jobConf);
-      }
+      JobConf jobConf = tracker.getJobConf(JobID.forName(jobId));
+      String userName = jobConf.getUser();
+      String runAsUserName = tracker.getTaskController().getRunAsUser(jobConf);
+
       // Index file
       String intermediateOutputDir = TaskTracker.getIntermediateOutputDir(userName, jobId, mapId);
       String indexKey = intermediateOutputDir + "/file.out.index";

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Wed Jul 24 23:11:17 2013
@@ -67,6 +67,9 @@ public class JobContext {
   public static final String USER_LOG_RETAIN_HOURS = 
     "mapred.userlog.retain.hours";
   
+  public static final String SHUFFLE_CONSUMER_PLUGIN_ATTR =
+    "mapreduce.job.reduce.shuffle.consumer.plugin.class";
+
   /**
    * The UserGroupInformation object that has a reference to the current user
    */

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java Wed Jul 24 23:11:17 2013
@@ -39,10 +39,8 @@ public class TestReduceTaskFetchFail {
     public String getJobFile() { return "/foo"; }
 
     public class TestReduceCopier extends ReduceCopier {
-      public TestReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
-                        TaskReporter reporter
-                        )throws ClassNotFoundException, IOException {
-        super(umbilical, conf, reporter);
+      public void init(ShuffleConsumerPlugin.Context context)throws ClassNotFoundException, IOException {
+        super.init(context);
       }
 
       public void checkAndInformJobTracker(int failures, TaskAttemptID mapId, boolean readError) {
@@ -69,8 +67,10 @@ public class TestReduceTaskFetchFail {
     TaskAttemptID tid =  new TaskAttemptID();
     TestReduceTask rTask = new TestReduceTask();
     rTask.setConf(conf);
+    ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(rTask, mockUmbilical, conf, mockTaskReporter);
 
-    ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+    ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier();
+    reduceCopier.init(context);
     reduceCopier.checkAndInformJobTracker(1, tid, false);
 
     verify(mockTaskReporter, never()).progress();
@@ -82,7 +82,9 @@ public class TestReduceTaskFetchFail {
     conf.setInt("mapreduce.reduce.shuffle.maxfetchfailures", 3);
 
     rTask.setConf(conf);
-    reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+    context = new ShuffleConsumerPlugin.Context(rTask, mockUmbilical, conf, mockTaskReporter);
+    reduceCopier = rTask.new TestReduceCopier();
+    reduceCopier.init(context);
 
     reduceCopier.checkAndInformJobTracker(1, tid, false);
     verify(mockTaskReporter, times(1)).progress();
@@ -103,7 +105,9 @@ public class TestReduceTaskFetchFail {
     conf.setBoolean("mapreduce.reduce.shuffle.notify.readerror", false);
 
     rTask.setConf(conf);
-    reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+    context = new ShuffleConsumerPlugin.Context(rTask, mockUmbilical, conf, mockTaskReporter);
+    reduceCopier = rTask.new TestReduceCopier();
+    reduceCopier.init(context);
 
     reduceCopier.checkAndInformJobTracker(7, tid, true);
     verify(mockTaskReporter, times(4)).progress();

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java?rev=1506774&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java Wed Jul 24 23:11:17 2013
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.fs.LocalFileSystem;
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.JobContext;
+
+/**
+ * A JUnit test for testing availability and accessibility of main API that is needed
+ * for sub-classes of ShuffleProviderPlugin and ShuffleConsumerPlugin.
+ * The importance of this test is for preserving API with 3rd party plugins.
+ */
+public class TestShufflePlugin {
+
+  static class TestShuffleConsumerPlugin implements ShuffleConsumerPlugin {
+    @Override
+    public void init(ShuffleConsumerPlugin.Context context) {
+      // just verify that Context has kept its public interface
+      context.getReduceTask();
+      context.getConf();
+      context.getUmbilical();
+      context.getReporter();
+    }
+    @Override
+    public boolean fetchOutputs() throws IOException{
+      return true;
+    }
+    @Override
+    public Throwable getMergeThrowable(){
+      return null;
+    }
+    @Override
+    public RawKeyValueIterator createKVIterator(JobConf job, FileSystem fs, Reporter reporter) throws IOException{
+      return null;
+    }
+    @Override
+    public void close(){
+    }
+  }
+
+  @Test
+  /**
+   * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
+   * as if it came from a 3rd party.
+   */
+  public void testConsumerPluginAbility() {
+
+    try{
+      // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
+      JobConf jobConf = new JobConf();
+      jobConf.setClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR,
+          TestShufflePlugin.TestShuffleConsumerPlugin.class,
+          ShuffleConsumerPlugin.class);
+
+      ShuffleConsumerPlugin shuffleConsumerPlugin = null;
+      Class<? extends ShuffleConsumerPlugin> clazz =
+          jobConf.getClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, null, ShuffleConsumerPlugin.class);
+      assertNotNull("Unable to get " + JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, clazz);
+
+      // load 3rd party plugin through core's factory method
+      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
+      assertNotNull("Unable to load " + JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, shuffleConsumerPlugin);
+    }
+    catch (Exception e) {
+      assertTrue("Threw exception:" + e, false);
+    }
+  }
+
+  static class TestShuffleProviderPlugin implements ShuffleProviderPlugin {
+    @Override
+    public void initialize(TaskTracker tt) {
+    }
+    @Override
+    public void destroy(){
+    }
+  }
+
+  @Test
+  /**
+   * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
+   * as if it came from a 3rd party.
+   */
+  public void testProviderPluginAbility() {
+
+    try{
+      // create JobConf with mapreduce.job.shuffle.provider.plugin=TestShuffleProviderPlugin
+      JobConf jobConf = new JobConf();
+      jobConf.setClass(TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES,
+          TestShufflePlugin.TestShuffleProviderPlugin.class,
+          ShuffleProviderPlugin.class);
+
+      ShuffleProviderPlugin shuffleProviderPlugin = null;
+      Class<? extends ShuffleProviderPlugin> clazz =
+          jobConf.getClass(TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES, null, ShuffleProviderPlugin.class);
+      assertNotNull("Unable to get " + TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES, clazz);
+
+      // load 3rd party plugin through core's factory method
+      shuffleProviderPlugin = ReflectionUtils.newInstance(clazz, jobConf);
+      assertNotNull("Unable to load " + TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES, shuffleProviderPlugin);
+    }
+    catch (Exception e) {
+      assertTrue("Threw exception:" + e, false);
+    }
+  }
+
+  @Test
+  /**
+   * A method for testing availability and accessibility of API that is needed for sub-classes of ShuffleProviderPlugin
+   */
+  public void testProvider() {
+    //mock creation
+    ShuffleProviderPlugin mockShuffleProvider = mock(ShuffleProviderPlugin.class);
+    TaskTracker mockTT = mock(TaskTracker.class);
+    TaskController mockTaskController = mock(TaskController.class);
+
+    mockShuffleProvider.initialize(mockTT);
+    mockShuffleProvider.destroy();
+    try {
+      mockTT.getJobConf();
+      mockTT.getJobConf(mock(JobID.class));
+      mockTT.getIntermediateOutputDir("","","");
+      mockTT.getTaskController();
+      mockTaskController.getRunAsUser(mock(JobConf.class));
+    }
+    catch (Exception e){
+      assertTrue("Threw exception:" + e, false);
+    }
+  }
+
+  @Test
+  /**
+   * A method for testing availability and accessibility of API that is needed for sub-classes of ShuffleConsumerPlugin
+   */
+  public void testConsumer() {
+    //mock creation
+    ShuffleConsumerPlugin mockShuffleConsumer = mock(ShuffleConsumerPlugin.class);
+    ReduceTask mockReduceTask = mock(ReduceTask.class);
+    JobConf mockJobConf = mock(JobConf.class);
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    TaskReporter mockReporter = mock(TaskReporter.class);
+    LocalFileSystem mockLocalFileSystem = mock(LocalFileSystem.class);
+
+    mockReduceTask.getTaskID();
+    mockReduceTask.getJobID();
+    mockReduceTask.getNumMaps();
+    mockReduceTask.getPartition();
+    mockReduceTask.getJobFile();
+    mockReduceTask.getJvmContext();
+
+    mockReporter.progress();
+
+    try {
+      String [] dirs = mockJobConf.getLocalDirs();
+      ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(mockReduceTask, mockUmbilical, mockJobConf, mockReporter);
+      mockShuffleConsumer.init(context);
+      mockShuffleConsumer.fetchOutputs();
+      mockShuffleConsumer.createKVIterator(mockJobConf, mockLocalFileSystem.getRaw(), mockReporter);
+      mockShuffleConsumer.close();
+    }
+    catch (Exception e){
+      assertTrue("Threw exception:" + e, false);
+    }
+  }
+}