You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/08/06 08:40:43 UTC

svn commit: r1510867 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/or...

Author: sandy
Date: Tue Aug  6 06:40:42 2013
New Revision: 1510867

URL: http://svn.apache.org/r1510867
Log:
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and Aaron Kimball via Sandy Ryza)

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
      - copied unchanged from r1510866, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Aug  6 06:40:42 2013
@@ -20,6 +20,9 @@ Release 2.3.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
+    Aaron Kimball via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Aug  6 06:40:42 2013
@@ -79,11 +79,15 @@ public class LocalJobRunner implements C
   public static final String LOCAL_MAX_MAPS =
     "mapreduce.local.map.tasks.maximum";
 
+  /** The maximum number of reduce tasks to run in parallel in LocalJobRunner */
+  public static final String LOCAL_MAX_REDUCES =
+    "mapreduce.local.reduce.tasks.maximum";
+
   private FileSystem fs;
   private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
   private JobConf conf;
   private AtomicInteger map_tasks = new AtomicInteger(0);
-  private int reduce_tasks = 0;
+  private AtomicInteger reduce_tasks = new AtomicInteger(0);
   final Random rand = new Random();
   
   private LocalJobRunnerMetrics myMetrics = null;
@@ -115,9 +119,11 @@ public class LocalJobRunner implements C
     private JobConf job;
 
     private int numMapTasks;
+    private int numReduceTasks;
     private float [] partialMapProgress;
+    private float [] partialReduceProgress;
     private Counters [] mapCounters;
-    private Counters reduceCounters;
+    private Counters [] reduceCounters;
 
     private JobStatus status;
     private List<TaskAttemptID> mapIds = Collections.synchronizedList(
@@ -184,10 +190,14 @@ public class LocalJobRunner implements C
       this.start();
     }
 
+    protected abstract class RunnableWithThrowable implements Runnable {
+      public volatile Throwable storedException;
+    }
+
     /**
      * A Runnable instance that handles a map task to be run by an executor.
      */
-    protected class MapTaskRunnable implements Runnable {
+    protected class MapTaskRunnable extends RunnableWithThrowable {
       private final int taskId;
       private final TaskSplitMetaInfo info;
       private final JobID jobId;
@@ -198,8 +208,6 @@ public class LocalJobRunner implements C
       // where to fetch mapper outputs.
       private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
 
-      public volatile Throwable storedException;
-
       public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
           Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
         this.info = info;
@@ -253,12 +261,13 @@ public class LocalJobRunner implements C
      * @param mapOutputFiles a mapping from task attempts to output files
      * @return a List of Runnables, one per map task.
      */
-    protected List<MapTaskRunnable> getMapTaskRunnables(
+    protected List<RunnableWithThrowable> getMapTaskRunnables(
         TaskSplitMetaInfo [] taskInfo, JobID jobId,
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
 
       int numTasks = 0;
-      ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+      ArrayList<RunnableWithThrowable> list =
+          new ArrayList<RunnableWithThrowable>();
       for (TaskSplitMetaInfo task : taskInfo) {
         list.add(new MapTaskRunnable(task, numTasks++, jobId,
             mapOutputFiles));
@@ -267,12 +276,89 @@ public class LocalJobRunner implements C
       return list;
     }
 
+    protected class ReduceTaskRunnable extends RunnableWithThrowable {
+      private final int taskId;
+      private final JobID jobId;
+      private final JobConf localConf;
+
+      // This is a reference to a shared object passed in by the
+      // external context; this delivers state to the reducers regarding
+      // where to fetch mapper outputs.
+      private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
+
+      public ReduceTaskRunnable(int taskId, JobID jobId,
+          Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+        this.taskId = taskId;
+        this.jobId = jobId;
+        this.mapOutputFiles = mapOutputFiles;
+        this.localConf = new JobConf(job);
+        this.localConf.set("mapreduce.jobtracker.address", "local");
+      }
+
+      public void run() {
+        try {
+          TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
+              jobId, TaskType.REDUCE, taskId), 0);
+          LOG.info("Starting task: " + reduceId);
+
+          ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
+              reduceId, taskId, mapIds.size(), 1);
+          reduce.setUser(UserGroupInformation.getCurrentUser().
+              getShortUserName());
+          setupChildMapredLocalDirs(localJobDir, reduce, localConf);
+          reduce.setLocalMapFiles(mapOutputFiles);
+
+          if (!Job.this.isInterrupted()) {
+            reduce.setJobFile(localJobFile.toString());
+            localConf.setUser(reduce.getUser());
+            reduce.localizeConfiguration(localConf);
+            reduce.setConf(localConf);
+            try {
+              reduce_tasks.getAndIncrement();
+              myMetrics.launchReduce(reduce.getTaskID());
+              reduce.run(localConf, Job.this);
+              myMetrics.completeReduce(reduce.getTaskID());
+            } finally {
+              reduce_tasks.getAndDecrement();
+            }
+
+            LOG.info("Finishing task: " + reduceId);
+          } else {
+            throw new InterruptedException();
+          }
+        } catch (Throwable t) {
+          // store this to be rethrown in the initial thread context.
+          this.storedException = t;
+        }
+      }
+    }
+
+    /**
+     * Create Runnables to encapsulate reduce tasks for use by the executor
+     * service.
+     * @param jobId the job id
+     * @param mapOutputFiles a mapping from task attempts to output files
+     * @return a List of Runnables, one per reduce task.
+     */
+    protected List<RunnableWithThrowable> getReduceTaskRunnables(
+        JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+
+      int taskId = 0;
+      ArrayList<RunnableWithThrowable> list =
+          new ArrayList<RunnableWithThrowable>();
+      for (int i = 0; i < this.numReduceTasks; i++) {
+        list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
+      }
+
+      return list;
+    }
+
     /**
      * Initialize the counters that will hold partial-progress from
      * the various task attempts.
      * @param numMaps the number of map tasks in this job.
      */
-    private synchronized void initCounters(int numMaps) {
+    private synchronized void initCounters(int numMaps, int numReduces) {
       // Initialize state trackers for all map tasks.
       this.partialMapProgress = new float[numMaps];
       this.mapCounters = new Counters[numMaps];
@@ -280,16 +366,22 @@ public class LocalJobRunner implements C
         this.mapCounters[i] = new Counters();
       }
 
-      this.reduceCounters = new Counters();
+      this.partialReduceProgress = new float[numReduces];
+      this.reduceCounters = new Counters[numReduces];
+      for (int i = 0; i < numReduces; i++) {
+        this.reduceCounters[i] = new Counters();
+      }
+
+      this.numMapTasks = numMaps;
+      this.numReduceTasks = numReduces;
     }
 
     /**
      * Creates the executor service used to run map tasks.
      *
-     * @param numMapTasks the total number of map tasks to be run
      * @return an ExecutorService instance that handles map tasks
      */
-    protected ExecutorService createMapExecutor(int numMapTasks) {
+    protected synchronized ExecutorService createMapExecutor() {
 
       // Determine the size of the thread pool to use
       int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
@@ -297,13 +389,10 @@ public class LocalJobRunner implements C
         throw new IllegalArgumentException(
             "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
       }
-      this.numMapTasks = numMapTasks;
       maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
       maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
 
-      initCounters(this.numMapTasks);
-
-      LOG.debug("Starting thread pool executor.");
+      LOG.debug("Starting mapper thread pool executor.");
       LOG.debug("Max local threads: " + maxMapThreads);
       LOG.debug("Map tasks to process: " + this.numMapTasks);
 
@@ -315,6 +404,65 @@ public class LocalJobRunner implements C
 
       return executor;
     }
+    
+    /**
+     * Creates the executor service used to run reduce tasks.
+     *
+     * @return an ExecutorService instance that handles reduce tasks
+     */
+    protected synchronized ExecutorService createReduceExecutor() {
+
+      // Determine the size of the thread pool to use
+      int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1);
+      if (maxReduceThreads < 1) {
+        throw new IllegalArgumentException(
+            "Configured " + LOCAL_MAX_REDUCES + " must be >= 1");
+      }
+      maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks);
+      maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks.
+
+      LOG.debug("Starting reduce thread pool executor.");
+      LOG.debug("Max local threads: " + maxReduceThreads);
+      LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
+
+      // Create a new executor service to drain the work queue.
+      ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+
+      return executor;
+    }
+
+    /** Run a set of tasks and waits for them to complete. */
+    private void runTasks(List<RunnableWithThrowable> runnables,
+        ExecutorService service, String taskType) throws Exception {
+      // Start populating the executor with work units.
+      // They may begin running immediately (in other threads).
+      for (Runnable r : runnables) {
+        service.submit(r);
+      }
+
+      try {
+        service.shutdown(); // Instructs queue to drain.
+
+        // Wait for tasks to finish; do not use a time-based timeout.
+        // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+        LOG.info("Waiting for " + taskType + " tasks");
+        service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+      } catch (InterruptedException ie) {
+        // Cancel all threads.
+        service.shutdownNow();
+        throw ie;
+      }
+
+      LOG.info(taskType + " task executor complete.");
+
+      // After waiting for the tasks to complete, if any of these
+      // have thrown an exception, rethrow it now in the main thread context.
+      for (RunnableWithThrowable r : runnables) {
+        if (r.storedException != null) {
+          throw new Exception(r.storedException);
+        }
+      }
+    }
 
     private org.apache.hadoop.mapreduce.OutputCommitter 
     createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
@@ -360,94 +508,25 @@ public class LocalJobRunner implements C
           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
 
         int numReduceTasks = job.getNumReduceTasks();
-        if (numReduceTasks > 1 || numReduceTasks < 0) {
-          // we only allow 0 or 1 reducer in local mode
-          numReduceTasks = 1;
-          job.setNumReduceTasks(1);
-        }
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
 
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
             Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
-
-        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
-            jobId, mapOutputFiles);
-        ExecutorService mapService = createMapExecutor(taskRunnables.size());
-
-        // Start populating the executor with work units.
-        // They may begin running immediately (in other threads).
-        for (Runnable r : taskRunnables) {
-          mapService.submit(r);
-        }
+        
+        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
+            taskSplitMetaInfos, jobId, mapOutputFiles);
+              
+        initCounters(mapRunnables.size(), numReduceTasks);
+        ExecutorService mapService = createMapExecutor();
+        runTasks(mapRunnables, mapService, "map");
 
         try {
-          mapService.shutdown(); // Instructs queue to drain.
-
-          // Wait for tasks to finish; do not use a time-based timeout.
-          // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
-          LOG.info("Waiting for map tasks");
-          mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-        } catch (InterruptedException ie) {
-          // Cancel all threads.
-          mapService.shutdownNow();
-          throw ie;
-        }
-
-        LOG.info("Map task executor complete.");
-
-        // After waiting for the map tasks to complete, if any of these
-        // have thrown an exception, rethrow it now in the main thread context.
-        for (MapTaskRunnable r : taskRunnables) {
-          if (r.storedException != null) {
-            throw new Exception(r.storedException);
-          }
-        }
-
-        TaskAttemptID reduceId =
-          new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
-        try {
           if (numReduceTasks > 0) {
-            ReduceTask reduce = new ReduceTask(systemJobFile.toString(), 
-                reduceId, 0, mapIds.size(), 1);
-            reduce.setUser(UserGroupInformation.getCurrentUser().
-                getShortUserName());
-            JobConf localConf = new JobConf(job);
-            localConf.set("mapreduce.jobtracker.address", "local");
-            setupChildMapredLocalDirs(localJobDir, reduce, localConf);
-            // move map output to reduce input  
-            for (int i = 0; i < mapIds.size(); i++) {
-              if (!this.isInterrupted()) {
-                TaskAttemptID mapId = mapIds.get(i);
-                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
-                MapOutputFile localOutputFile = new MROutputFiles();
-                localOutputFile.setConf(localConf);
-                Path reduceIn =
-                  localOutputFile.getInputFileForWrite(mapId.getTaskID(),
-                        localFs.getFileStatus(mapOut).getLen());
-                if (!localFs.mkdirs(reduceIn.getParent())) {
-                  throw new IOException("Mkdirs failed to create "
-                      + reduceIn.getParent().toString());
-                }
-                if (!localFs.rename(mapOut, reduceIn))
-                  throw new IOException("Couldn't rename " + mapOut);
-              } else {
-                throw new InterruptedException();
-              }
-            }
-            if (!this.isInterrupted()) {
-              reduce.setJobFile(localJobFile.toString());
-              localConf.setUser(reduce.getUser());
-              reduce.localizeConfiguration(localConf);
-              reduce.setConf(localConf);
-              reduce_tasks += 1;
-              myMetrics.launchReduce(reduce.getTaskID());
-              reduce.run(localConf, this);
-              myMetrics.completeReduce(reduce.getTaskID());
-              reduce_tasks -= 1;
-            } else {
-              throw new InterruptedException();
-            }
+            List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
+                jobId, mapOutputFiles);
+            ExecutorService reduceService = createReduceExecutor();
+            runTasks(reduceRunnables, reduceService, "reduce");
           }
         } finally {
           for (MapOutputFile output : mapOutputFiles.values()) {
@@ -465,7 +544,6 @@ public class LocalJobRunner implements C
         }
 
         JobEndNotifier.localRunnerNotification(job, status);
-
       } catch (Throwable t) {
         try {
           outputCommitter.abortJob(jContext, 
@@ -511,12 +589,13 @@ public class LocalJobRunner implements C
           new ByteArrayInputStream(baos.toByteArray())));
       
       LOG.info(taskStatus.getStateString());
-      int taskIndex = mapIds.indexOf(taskId);
-      if (taskIndex >= 0) {                       // mapping
+      int mapTaskIndex = mapIds.indexOf(taskId);
+      if (mapTaskIndex >= 0) {
+        // mapping
         float numTasks = (float) this.numMapTasks;
 
-        partialMapProgress[taskIndex] = taskStatus.getProgress();
-        mapCounters[taskIndex] = taskStatus.getCounters();
+        partialMapProgress[mapTaskIndex] = taskStatus.getProgress();
+        mapCounters[mapTaskIndex] = taskStatus.getCounters();
 
         float partialProgress = 0.0f;
         for (float f : partialMapProgress) {
@@ -524,8 +603,18 @@ public class LocalJobRunner implements C
         }
         status.setMapProgress(partialProgress / numTasks);
       } else {
-        reduceCounters = taskStatus.getCounters();
-        status.setReduceProgress(taskStatus.getProgress());
+        // reducing
+        int reduceTaskIndex = taskId.getTaskID().getId();
+        float numTasks = (float) this.numReduceTasks;
+
+        partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress();
+        reduceCounters[reduceTaskIndex] = taskStatus.getCounters();
+
+        float partialProgress = 0.0f;
+        for (float f : partialReduceProgress) {
+          partialProgress += f;
+        }
+        status.setReduceProgress(partialProgress / numTasks);
       }
 
       // ignore phase
@@ -545,7 +634,13 @@ public class LocalJobRunner implements C
       for (Counters c : mapCounters) {
         current = Counters.sum(current, c);
       }
-      current = Counters.sum(current, reduceCounters);
+
+      if (null != reduceCounters && reduceCounters.length > 0) {
+        for (Counters c : reduceCounters) {
+          current = Counters.sum(current, c);
+        }
+      }
+
       return current;
     }
 
@@ -684,8 +779,9 @@ public class LocalJobRunner implements C
   
   public ClusterMetrics getClusterMetrics() {
     int numMapTasks = map_tasks.get();
-    return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
-        reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
+    int numReduceTasks = reduce_tasks.get();
+    return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
+        numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
   }
 
   public JobTrackerStatus getJobTrackerStatus() {
@@ -816,6 +912,27 @@ public class LocalJobRunner implements C
     return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
   }
 
+
+  /**
+   * Set the max number of reduce tasks to run concurrently in the LocalJobRunner.
+   * @param job the job to configure
+   * @param maxReduces the maximum number of reduce tasks to allow.
+   */
+  public static void setLocalMaxRunningReduces(
+      org.apache.hadoop.mapreduce.JobContext job,
+      int maxReduces) {
+    job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces);
+  }
+
+  /**
+   * @return the max number of reduce tasks to run concurrently in the
+   * LocalJobRunner.
+   */
+  public static int getLocalMaxRunningReduces(
+      org.apache.hadoop.mapreduce.JobContext job) {
+    return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1);
+  }
+
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                        ) throws IOException,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Tue Aug  6 06:40:42 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
@@ -1860,7 +1861,6 @@ public class MapTask extends Task {
       }
       {
         sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
-        Merger.considerFinalMergeForProgress();
         
         IndexRecord rec = new IndexRecord();
         final SpillRecord spillRec = new SpillRecord(partitions);
@@ -1893,7 +1893,8 @@ public class MapTask extends Task {
                          segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,
-                         null, spilledRecordsCounter, sortPhase.phase());
+                         null, spilledRecordsCounter, sortPhase.phase(),
+                         TaskType.MAP);
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Aug  6 06:40:42 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -69,7 +70,8 @@ public class Merger {  
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, null).merge(keyClass, valueClass,
+                           reporter, null,
+                           TaskType.REDUCE).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter, 
                                            mergePhase);
@@ -90,7 +92,8 @@ public class Merger {  
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, mergedMapOutputsCounter).merge(
+                           reporter, mergedMapOutputsCounter,
+                           TaskType.REDUCE).merge(
                                            keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter,
@@ -124,7 +127,8 @@ public class Merger {  
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments,
+                           TaskType.REDUCE).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -140,10 +144,12 @@ public class Merger {  
                             boolean sortSegments,
                             Counters.Counter readsCounter,
                             Counters.Counter writesCounter,
-                            Progress mergePhase)
+                            Progress mergePhase,
+                            TaskType taskType)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments, codec).merge(keyClass, valueClass,
+                           sortSegments, codec,
+                           taskType).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -161,7 +167,8 @@ public class Merger {  
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments,
+                           TaskType.REDUCE).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
                                                tmpDir,
                                                readsCounter, writesCounter,
@@ -182,7 +189,8 @@ public class Merger {  
                           Progress mergePhase)
     throws IOException {
   return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec).merge(keyClass, valueClass,
+                         sortSegments, codec,
+                         TaskType.REDUCE).merge(keyClass, valueClass,
                                              mergeFactor, inMemSegments,
                                              tmpDir,
                                              readsCounter, writesCounter,
@@ -366,20 +374,7 @@ public class Merger {  
       }
     }
   }
-  
-  // Boolean variable for including/considering final merge as part of sort
-  // phase or not. This is true in map task, false in reduce task. It is
-  // used in calculating mergeProgress.
-  static boolean includeFinalMerge = false;
-  
-  /**
-   * Sets the boolean variable includeFinalMerge to true. Called from
-   * map task before calling merge() so that final merge of map task
-   * is also considered as part of sort phase.
-   */
-  static void considerFinalMergeForProgress() {
-    includeFinalMerge = true;
-  }
+
   
   private static class MergeQueue<K extends Object, V extends Object> 
   extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
@@ -401,6 +396,21 @@ public class Merger {  
     final DataInputBuffer value = new DataInputBuffer();
     final DataInputBuffer diskIFileValue = new DataInputBuffer();
     
+    
+    // Boolean variable for including/considering final merge as part of sort
+    // phase or not. This is true in map task, false in reduce task. It is
+    // used in calculating mergeProgress.
+    private boolean includeFinalMerge = false;
+    
+    /**
+     * Sets the boolean variable includeFinalMerge to true. Called from
+     * map task before calling merge() so that final merge of map task
+     * is also considered as part of sort phase.
+     */
+    private void considerFinalMergeForProgress() {
+      includeFinalMerge = true;
+    }    
+    
     Segment<K, V> minSegment;
     Comparator<Segment<K, V>> segmentComparator =   
       new Comparator<Segment<K, V>>() {
@@ -419,14 +429,16 @@ public class Merger {  
                       CompressionCodec codec, RawComparator<K> comparator,
                       Progressable reporter) 
     throws IOException {
-      this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
+      this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null,
+          TaskType.REDUCE);
     }
     
     public MergeQueue(Configuration conf, FileSystem fs, 
                       Path[] inputs, boolean deleteInputs, 
                       CompressionCodec codec, RawComparator<K> comparator,
                       Progressable reporter, 
-                      Counters.Counter mergedMapOutputsCounter) 
+                      Counters.Counter mergedMapOutputsCounter,
+                      TaskType taskType) 
     throws IOException {
       this.conf = conf;
       this.fs = fs;
@@ -434,6 +446,10 @@ public class Merger {  
       this.comparator = comparator;
       this.reporter = reporter;
       
+      if (taskType == TaskType.MAP) {
+        considerFinalMergeForProgress();
+      }
+      
       for (Path file : inputs) {
         LOG.debug("MergeQ: adding: " + file);
         segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs, 
@@ -449,17 +465,20 @@ public class Merger {  
     public MergeQueue(Configuration conf, FileSystem fs,
         List<Segment<K, V>> segments, RawComparator<K> comparator,
         Progressable reporter) {
-      this(conf, fs, segments, comparator, reporter, false);
+      this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE);
     }
 
     public MergeQueue(Configuration conf, FileSystem fs, 
         List<Segment<K, V>> segments, RawComparator<K> comparator,
-        Progressable reporter, boolean sortSegments) {
+        Progressable reporter, boolean sortSegments, TaskType taskType) {
       this.conf = conf;
       this.fs = fs;
       this.comparator = comparator;
       this.segments = segments;
       this.reporter = reporter;
+      if (taskType == TaskType.MAP) {
+        considerFinalMergeForProgress();
+      }
       if (sortSegments) {
         Collections.sort(segments, segmentComparator);
       }
@@ -467,8 +486,10 @@ public class Merger {  
 
     public MergeQueue(Configuration conf, FileSystem fs,
         List<Segment<K, V>> segments, RawComparator<K> comparator,
-        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
-      this(conf, fs, segments, comparator, reporter, sortSegments);
+        Progressable reporter, boolean sortSegments, CompressionCodec codec,
+        TaskType taskType) {
+      this(conf, fs, segments, comparator, reporter, sortSegments,
+          taskType);
       this.codec = codec;
     }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Tue Aug  6 06:40:42 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -74,6 +75,10 @@ public class ReduceTask extends Task {
 
   private CompressionCodec codec;
 
+  // If this is a LocalJobRunner-based job, this will
+  // be a mapping from map task attempts to their output files.
+  // This will be null in other cases.
+  private Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
   { 
     getProgress().setStatus("reduce"); 
@@ -105,24 +110,24 @@ public class ReduceTask extends Task {
   // file paths, the first parameter is considered smaller than the second one.
   // In case of files with same size and path are considered equal.
   private Comparator<FileStatus> mapOutputFileComparator = 
-    new Comparator<FileStatus>() {
-      public int compare(FileStatus a, FileStatus b) {
-        if (a.getLen() < b.getLen())
-          return -1;
-        else if (a.getLen() == b.getLen())
-          if (a.getPath().toString().equals(b.getPath().toString()))
-            return 0;
-          else
-            return -1; 
+      new Comparator<FileStatus>() {
+    public int compare(FileStatus a, FileStatus b) {
+      if (a.getLen() < b.getLen())
+        return -1;
+      else if (a.getLen() == b.getLen())
+        if (a.getPath().toString().equals(b.getPath().toString()))
+          return 0;
         else
-          return 1;
-      }
+          return -1; 
+      else
+        return 1;
+    }
   };
-  
+
   // A sorted set for keeping a set of map output files on disk
   private final SortedSet<FileStatus> mapOutputFilesOnDisk = 
-    new TreeSet<FileStatus>(mapOutputFileComparator);
-
+      new TreeSet<FileStatus>(mapOutputFileComparator);
+  
   public ReduceTask() {
     super();
   }
@@ -133,6 +138,17 @@ public class ReduceTask extends Task {
     this.numMaps = numMaps;
   }
   
+
+  /**
+   * Register the set of mapper outputs created by a LocalJobRunner-based
+   * job with this ReduceTask so it knows where to fetch from.
+   *
+   * This should not be called in normal (networked) execution.
+   */
+  public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> mapFiles) {
+    this.localMapFiles = mapFiles;
+  }
+
   private CompressionCodec initCodec() {
     // check if map-outputs are to be compressed
     if (conf.getCompressMapOutput()) {
@@ -174,20 +190,11 @@ public class ReduceTask extends Task {
     numMaps = in.readInt();
   }
   
-  // Get the input files for the reducer.
-  private Path[] getMapFiles(FileSystem fs, boolean isLocal) 
-  throws IOException {
+  // Get the input files for the reducer (for local jobs).
+  private Path[] getMapFiles(FileSystem fs) throws IOException {
     List<Path> fileList = new ArrayList<Path>();
-    if (isLocal) {
-      // for local jobs
-      for(int i = 0; i < numMaps; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i));
-      }
-    } else {
-      // for non local jobs
-      for (FileStatus filestatus : mapOutputFilesOnDisk) {
-        fileList.add(filestatus.getPath());
-      }
+    for(int i = 0; i < numMaps; ++i) {
+      fileList.add(mapOutputFile.getInputFile(i));
     }
     return fileList.toArray(new Path[0]);
   }
@@ -341,56 +348,33 @@ public class ReduceTask extends Task {
     // Initialize the codec
     codec = initCodec();
     RawKeyValueIterator rIter = null;
-    ShuffleConsumerPlugin shuffleConsumerPlugin = null; 
-    
-    boolean isLocal = false; 
-    // local if
-    // 1) framework == local or
-    // 2) framework == null and job tracker address == local
-    String framework = job.get(MRConfig.FRAMEWORK_NAME);
-    String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local");
-    if ((framework == null && masterAddr.equals("local"))
-        || (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) {
-      isLocal = true;
-    }
+    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
     
-    if (!isLocal) {
-      Class combinerClass = conf.getCombinerClass();
-      CombineOutputCollector combineCollector = 
-        (null != combinerClass) ? 
- 	     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
-
-      Class<? extends ShuffleConsumerPlugin> clazz =
-            job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
-						
-      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
-      LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
-
-      ShuffleConsumerPlugin.Context shuffleContext = 
-        new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
-                    super.lDirAlloc, reporter, codec, 
-                    combinerClass, combineCollector, 
-                    spilledRecordsCounter, reduceCombineInputCounter,
-                    shuffledMapsCounter,
-                    reduceShuffleBytes, failedShuffleCounter,
-                    mergedMapOutputsCounter,
-                    taskStatus, copyPhase, sortPhase, this,
-                    mapOutputFile);
-      shuffleConsumerPlugin.init(shuffleContext);
-      rIter = shuffleConsumerPlugin.run();
-    } else {
-      // local job runner doesn't have a copy phase
-      copyPhase.complete();
-      final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-      rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
-                           job.getMapOutputValueClass(), codec, 
-                           getMapFiles(rfs, true),
-                           !conf.getKeepFailedTaskFiles(), 
-                           job.getInt(JobContext.IO_SORT_FACTOR, 100),
-                           new Path(getTaskID().toString()), 
-                           job.getOutputKeyComparator(),
-                           reporter, spilledRecordsCounter, null, null);
-    }
+    Class combinerClass = conf.getCombinerClass();
+    CombineOutputCollector combineCollector = 
+      (null != combinerClass) ? 
+     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
+
+    Class<? extends ShuffleConsumerPlugin> clazz =
+          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+					
+    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+    ShuffleConsumerPlugin.Context shuffleContext = 
+      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
+                  super.lDirAlloc, reporter, codec, 
+                  combinerClass, combineCollector, 
+                  spilledRecordsCounter, reduceCombineInputCounter,
+                  shuffledMapsCounter,
+                  reduceShuffleBytes, failedShuffleCounter,
+                  mergedMapOutputsCounter,
+                  taskStatus, copyPhase, sortPhase, this,
+                  mapOutputFile, localMapFiles);
+    shuffleConsumerPlugin.init(shuffleContext);
+
+    rIter = shuffleConsumerPlugin.run();
+
     // free up the data structures
     mapOutputFilesOnDisk.clear();
     
@@ -409,9 +393,7 @@ public class ReduceTask extends Task {
                     keyClass, valueClass);
     }
 
-    if (shuffleConsumerPlugin != null) {
-      shuffleConsumerPlugin.close();
-    }
+    shuffleConsumerPlugin.close();
     done(umbilical, reporter);
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Tue Aug  6 06:40:42 2013
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Map;
+
 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -65,6 +67,7 @@ public interface ShuffleConsumerPlugin<K
     private final Progress mergePhase;
     private final Task reduceTask;
     private final MapOutputFile mapOutputFile;
+    private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
     public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
                    JobConf jobConf, FileSystem localFS,
@@ -80,7 +83,8 @@ public interface ShuffleConsumerPlugin<K
                    Counters.Counter failedShuffleCounter,
                    Counters.Counter mergedMapOutputsCounter,
                    TaskStatus status, Progress copyPhase, Progress mergePhase,
-                   Task reduceTask, MapOutputFile mapOutputFile) {
+                   Task reduceTask, MapOutputFile mapOutputFile,
+                   Map<TaskAttemptID, MapOutputFile> localMapFiles) {
       this.reduceId = reduceId;
       this.jobConf = jobConf;
       this.localFS = localFS;
@@ -101,6 +105,7 @@ public interface ShuffleConsumerPlugin<K
       this.mergePhase = mergePhase;
       this.reduceTask = reduceTask;
       this.mapOutputFile = mapOutputFile;
+      this.localMapFiles = localMapFiles;
     }
 
     public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
@@ -163,6 +168,9 @@ public interface ShuffleConsumerPlugin<K
     public MapOutputFile getMapOutputFile() {
       return mapOutputFile;
     }
+    public Map<TaskAttemptID, MapOutputFile> getLocalMapFiles() {
+      return localMapFiles;
+    }
   } // end of public static class Context<K,V>
 
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Aug  6 06:40:42 2013
@@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
   /* Default read timeout (in milliseconds) */
   private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
-  private final Reporter reporter;
+  protected final Reporter reporter;
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -71,13 +71,13 @@ class Fetcher<K,V> extends Thread {
   private final Counters.Counter badIdErrs;
   private final Counters.Counter wrongMapErrs;
   private final Counters.Counter wrongReduceErrs;
-  private final MergeManager<K,V> merger;
-  private final ShuffleSchedulerImpl<K,V> scheduler;
-  private final ShuffleClientMetrics metrics;
-  private final ExceptionReporter exceptionReporter;
-  private final int id;
+  protected final MergeManager<K,V> merger;
+  protected final ShuffleSchedulerImpl<K,V> scheduler;
+  protected final ShuffleClientMetrics metrics;
+  protected final ExceptionReporter exceptionReporter;
+  protected final int id;
   private static int nextId = 0;
-  private final int reduce;
+  protected final int reduce;
   
   private final int connectionTimeout;
   private final int readTimeout;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Aug  6 06:40:42 2013
@@ -18,10 +18,12 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Task;
@@ -56,6 +58,7 @@ public class Shuffle<K, V> implements Sh
   private Progress copyPhase;
   private TaskStatus taskStatus;
   private Task reduceTask; //Used for status updates
+  private Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
   @Override
   public void init(ShuffleConsumerPlugin.Context context) {
@@ -69,6 +72,7 @@ public class Shuffle<K, V> implements Sh
     this.copyPhase = context.getCopyPhase();
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
+    this.localMapFiles = context.getLocalMapFiles();
     
     scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
         this, copyPhase, context.getShuffledMapsCounter(),
@@ -103,13 +107,22 @@ public class Shuffle<K, V> implements Sh
     eventFetcher.start();
     
     // Start the map-output fetcher threads
-    final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+    boolean isLocal = localMapFiles != null;
+    final int numFetchers = isLocal ? 1 :
+      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
     Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
-    for (int i=0; i < numFetchers; ++i) {
-      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
-                                     reporter, metrics, this, 
-                                     reduceTask.getShuffleSecret());
-      fetchers[i].start();
+    if (isLocal) {
+      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
+          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
+          localMapFiles);
+      fetchers[0].start();
+    } else {
+      for (int i=0; i < numFetchers; ++i) {
+        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
+                                       reporter, metrics, this, 
+                                       reduceTask.getShuffleSecret());
+        fetchers[i].start();
+      }
     }
     
     // Wait for shuffle to complete successfully

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Tue Aug  6 06:40:42 2013
@@ -155,7 +155,7 @@ public class TestShufflePlugin<K, V> {
                                                 mockCounter, mockCounter, mockCounter,
                                                 mockCounter, mockCounter, mockCounter,
                                                 mockTaskStatus, mockProgress, mockProgress,
-                                                mockTask, mockMapOutputFile);
+                                                mockTask, mockMapOutputFile, null);
       shuffleConsumerPlugin.init(context);
       shuffleConsumerPlugin.run();
       shuffleConsumerPlugin.close();

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java Tue Aug  6 06:40:42 2013
@@ -276,18 +276,16 @@ public class TestJobCounters {
     // there are too few spills to combine (2 < 3)
     // Each map spills 2^14 records, so maps spill 49152 records, combined.
 
-    // The reduce spill count is composed of the read from one segment and
-    // the intermediate merge of the other two. The intermediate merge
+    // The combiner has emitted 24576 records to the reducer; these are all
+    // fetched straight to memory from the map side. The intermediate merge
     // adds 8192 records per segment read; again, there are too few spills to
-    // combine, so all 16834 are written to disk (total 32768 spilled records
-    // for the intermediate merge). The merge into the reduce includes only
-    // the unmerged segment, size 8192. Total spilled records in the reduce
-    // is 32768 from the merge + 8192 unmerged segment = 40960 records
+    // combine, so all Total spilled records in the reduce
+    // is 8192 records / map * 3 maps = 24576.
 
-    // Total: map + reduce = 49152 + 40960 = 90112
+    // Total: map + reduce = 49152 + 24576 = 73728
     // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 61440 output records
-    validateCounters(c1, 90112, 15360, 61440);
+    validateCounters(c1, 73728, 15360, 61440);
     validateFileCounters(c1, inputSize, 0, 0, 0);
     validateOldFileCounters(c1, inputSize, 61928, 0, 0);
   }
@@ -316,12 +314,12 @@ public class TestJobCounters {
     // 1st merge: read + write = 8192 * 4
     // 2nd merge: read + write = 8192 * 4
     // final merge: 0
-    // Total reduce: 65536
+    // Total reduce: 32768
 
-    // Total: map + reduce = 2^16 + 2^16 = 131072
+    // Total: map + reduce = 2^16 + 2^15 = 98304
     // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 81920 output records
-    validateCounters(c1, 131072, 20480, 81920);
+    validateCounters(c1, 98304, 20480, 81920);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -349,7 +347,7 @@ public class TestJobCounters {
     // Total reduce: 45056
     // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 102400 output records
-    validateCounters(c1, 147456, 25600, 102400);
+    validateCounters(c1, 122880, 25600, 102400);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -394,7 +392,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN0"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 90112, 15360, 61440);
+    validateCounters(c1, 73728, 15360, 61440);
     validateFileCounters(c1, inputSize, 0, 0, 0);    
   }
 
@@ -416,7 +414,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN1"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 131072, 20480, 81920);
+    validateCounters(c1, 98304, 20480, 81920);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -439,7 +437,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN2"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 147456, 25600, 102400);
+    validateCounters(c1, 122880, 25600, 102400);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Tue Aug  6 06:40:42 2013
@@ -63,7 +63,7 @@ public class TestKeyFieldBasedComparator
     conf.setOutputValueClass(LongWritable.class);
 
     conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(2);
+    conf.setNumReduceTasks(1);
 
     conf.setOutputFormat(TextOutputFormat.class);
     conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
@@ -101,9 +101,7 @@ public class TestKeyFieldBasedComparator
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
       String line = reader.readLine();
       //make sure we get what we expect as the first line, and also
-      //that we have two lines (both the lines must end up in the same
-      //reducer since the partitioner takes the same key spec for all
-      //lines
+      //that we have two lines
       if (expect == 1) {
         assertTrue(line.startsWith(line1));
       } else if (expect == 2) {

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java Tue Aug  6 06:40:42 2013
@@ -31,9 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.LocalJobRunner;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -410,6 +410,7 @@ public class TestLocalRunner extends Tes
   }
 
   /** Test case for zero mappers */
+  @Test
   public void testEmptyMaps() throws Exception {
     Job job = Job.getInstance();
     Path outputPath = getOutputPath();
@@ -428,5 +429,145 @@ public class TestLocalRunner extends Tes
     boolean success = job.waitForCompletion(true);
     assertTrue("Empty job should work", success);
   }
+
+  /** @return the directory where numberfiles are written (mapper inputs)  */
+  private Path getNumberDirPath() {
+    return new Path(getInputPath(), "numberfiles");
+  }
+
+  /**
+   * Write out an input file containing an integer.
+   *
+   * @param fileNum the file number to write to.
+   * @param value the value to write to the file
+   * @return the path of the written file.
+   */
+  private Path makeNumberFile(int fileNum, int value) throws IOException {
+    Path workDir = getNumberDirPath();
+    Path filePath = new Path(workDir, "file" + fileNum);
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+    w.write("" + value);
+    w.close();
+
+    return filePath;
+  }
+
+  /**
+   * Each record received by this mapper is a number 'n'.
+   * Emit the values [0..n-1]
+   */
+  public static class SequenceMapper
+      extends Mapper<LongWritable, Text, Text, NullWritable> {
+
+    public void map(LongWritable k, Text v, Context c)
+        throws IOException, InterruptedException {
+      int max = Integer.valueOf(v.toString());
+      for (int i = 0; i < max; i++) {
+        c.write(new Text("" + i), NullWritable.get());
+      }
+    }
+  }
+
+  private final static int NUMBER_FILE_VAL = 100;
+
+  /**
+   * Tally up the values and ensure that we got as much data
+   * out as we put in.
+   * Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
+   * Verify that across all our reducers we got exactly this much
+   * data back.
+   */
+  private void verifyNumberJob(int numMaps) throws Exception {
+    Path outputDir = getOutputPath();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    FileStatus [] stats = fs.listStatus(outputDir);
+    int valueSum = 0;
+    for (FileStatus f : stats) {
+      FSDataInputStream istream = fs.open(f.getPath());
+      BufferedReader r = new BufferedReader(new InputStreamReader(istream));
+      String line = null;
+      while ((line = r.readLine()) != null) {
+        valueSum += Integer.valueOf(line.trim());
+      }
+      r.close();
+    }
+
+    int maxVal = NUMBER_FILE_VAL - 1;
+    int expectedPerMapper = maxVal * (maxVal + 1) / 2;
+    int expectedSum = expectedPerMapper * numMaps;
+    LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
+    assertEquals("Didn't get all our results back", expectedSum, valueSum);
+  }
+
+  /**
+   * Run a test which creates a SequenceMapper / IdentityReducer
+   * job over a set of generated number files.
+   */
+  private void doMultiReducerTest(int numMaps, int numReduces,
+      int parallelMaps, int parallelReduces) throws Exception {
+
+    Path in = getNumberDirPath();
+    Path out = getOutputPath();
+
+    // Clear data from any previous tests.
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(out)) {
+      fs.delete(out, true);
+    }
+
+    if (fs.exists(in)) {
+      fs.delete(in, true);
+    }
+
+    for (int i = 0; i < numMaps; i++) {
+      makeNumberFile(i, 100);
+    }
+
+    Job job = Job.getInstance();
+    job.setNumReduceTasks(numReduces);
+
+    job.setMapperClass(SequenceMapper.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NullWritable.class);
+    FileInputFormat.addInputPath(job, in);
+    FileOutputFormat.setOutputPath(job, out);
+
+    LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
+    LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
+
+    boolean result = job.waitForCompletion(true);
+    assertTrue("Job failed!!", result);
+
+    verifyNumberJob(numMaps);
+  }
+  
+  @Test
+  public void testOneMapMultiReduce() throws Exception {
+    doMultiReducerTest(1, 2, 1, 1);
+  }
+
+  @Test
+  public void testOneMapMultiParallelReduce() throws Exception {
+    doMultiReducerTest(1, 2, 1, 2);
+  }
+
+  @Test
+  public void testMultiMapOneReduce() throws Exception {
+    doMultiReducerTest(4, 1, 2, 1);
+  }
+
+  @Test
+  public void testMultiMapMultiReduce() throws Exception {
+    doMultiReducerTest(4, 4, 2, 2);
+  }
+
 }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Tue Aug  6 06:40:42 2013
@@ -56,7 +56,7 @@ public class TestMRKeyFieldBasedComparat
     conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
     conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
 
-    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
                 line1 +"\n" + line2 + "\n"); 
     job.setMapperClass(InverseMapper.class);
     job.setReducerClass(Reducer.class);