You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2013/08/13 23:20:00 UTC

svn commit: r1513658 - in /hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/ma...

Author: cmccabe
Date: Tue Aug 13 21:19:53 2013
New Revision: 1513658

URL: http://svn.apache.org/r1513658
Log:
merge trunk into HDFS-4949 branch

Added:
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
      - copied unchanged from r1512447, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/dancing/package.html   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/package.html   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/math/package.html   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/package.html   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/package.html   (props changed)

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1509426-1512447

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/CHANGES.txt?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/CHANGES.txt Tue Aug 13 21:19:53 2013
@@ -157,6 +157,9 @@ Release 2.3.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
+    Aaron Kimball via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
@@ -167,24 +170,12 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal
     task-state (Ashwin Shankar via jlowe)
 
-    MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
-    input path dir (Devaraj K via jlowe)
-
-    MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl
-    (Devaraj K via jlowe)
-
     MAPREDUCE-5380. Invalid mapred command should return non-zero exit code
     (Stephen Chu via jlowe)
 
     MAPREDUCE-5404. HSAdminServer does not use ephemeral ports in minicluster
     mode (Ted Yu via jlowe)
 
-    MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
-    jlowe)
-
-    MAPREDUCE-5251. Reducer should not implicate map attempt if it has
-    insufficient space to fetch map output (Ashwin Shankar via jlowe)
-
 Release 2.1.1-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -198,11 +189,37 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5352. Optimize node local splits generated by
     CombineFileInputFormat. (sseth)
 
+    MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
+    conditions (jlowe via kihwal)
+
   BUG FIXES
 
     MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
     Joshi via vinodkv)
 
+    MAPREDUCE-5428.  HistoryFileManager doesn't stop threads when service is
+    stopped (Karthik Kambatla via jlowe)
+
+    MAPREDUCE-5251. Reducer should not implicate map attempt if it has
+    insufficient space to fetch map output (Ashwin Shankar via jlowe)
+
+    MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
+    jlowe)
+
+    MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl
+    (Devaraj K via jlowe)
+
+    MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
+    input path dir (Devaraj K via jlowe)
+
+    MAPREDUCE-5440. TestCopyCommitter Fails on JDK7 (Robert Parker via jlowe)
+
+    MAPREDUCE-5367. Local jobs all use same local working directory
+    (Sandy Ryza)
+
+    MAPREDUCE-5425. Junit in TestJobHistoryServer failing in jdk 7 (Robert
+    Parker via jlowe)
+
 Release 2.1.0-beta - 2013-08-06
 
   INCOMPATIBLE CHANGES
@@ -636,8 +653,8 @@ Release 2.1.0-beta - 2013-08-06
     MAPREDUCE-5419. TestSlive is getting FileNotFound Exception (Robert Parker
     via jlowe)
 
-    MAPREDUCE-5428.  HistoryFileManager doesn't stop threads when service is
-    stopped (Karthik Kambatla via jlowe)
+    MAPREDUCE-5399. Unnecessary Configuration instantiation in IFileInputStream
+    slows down merge. (Stanislav Barton via Sandy Ryza)
 
   BREAKDOWN OF HADOOP-8562 SUBTASKS
 
@@ -1273,6 +1290,8 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5419. TestSlive is getting FileNotFound Exception (Robert Parker
     via jlowe)
 
+    MAPREDUCE-5440. TestCopyCommitter Fails on JDK7 (Robert Parker via jlowe)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1509426-1512447

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Aug 13 21:19:53 2013
@@ -357,15 +357,20 @@ public class MRApp extends MRAppMaster {
   }
 
   public void waitForState(Service.STATE finalState) throws Exception {
-    int timeoutSecs = 0;
-    while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
-      System.out.println("MRApp State is : " + getServiceState()
-          + " Waiting for state : " + finalState);
-      Thread.sleep(500);
-    }
-    System.out.println("MRApp State is : " + getServiceState());
-    Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
-        getServiceState());
+    if (finalState == Service.STATE.STOPPED) {
+       Assert.assertTrue("Timeout while waiting for MRApp to stop",
+           waitForServiceToStop(20 * 1000));
+    } else {
+      int timeoutSecs = 0;
+      while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
+        System.out.println("MRApp State is : " + getServiceState()
+            + " Waiting for state : " + finalState);
+        Thread.sleep(500);
+      }
+      System.out.println("MRApp State is : " + getServiceState());
+      Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
+          getServiceState());
+    }
   }
 
   public void verifyCompleted() {

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Aug 13 21:19:53 2013
@@ -79,11 +79,15 @@ public class LocalJobRunner implements C
   public static final String LOCAL_MAX_MAPS =
     "mapreduce.local.map.tasks.maximum";
 
+  /** The maximum number of reduce tasks to run in parallel in LocalJobRunner */
+  public static final String LOCAL_MAX_REDUCES =
+    "mapreduce.local.reduce.tasks.maximum";
+
   private FileSystem fs;
   private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
   private JobConf conf;
   private AtomicInteger map_tasks = new AtomicInteger(0);
-  private int reduce_tasks = 0;
+  private AtomicInteger reduce_tasks = new AtomicInteger(0);
   final Random rand = new Random();
   
   private LocalJobRunnerMetrics myMetrics = null;
@@ -115,9 +119,11 @@ public class LocalJobRunner implements C
     private JobConf job;
 
     private int numMapTasks;
+    private int numReduceTasks;
     private float [] partialMapProgress;
+    private float [] partialReduceProgress;
     private Counters [] mapCounters;
-    private Counters reduceCounters;
+    private Counters [] reduceCounters;
 
     private JobStatus status;
     private List<TaskAttemptID> mapIds = Collections.synchronizedList(
@@ -146,7 +152,9 @@ public class LocalJobRunner implements C
       this.id = jobid;
       JobConf conf = new JobConf(systemJobFile);
       this.localFs = FileSystem.getLocal(conf);
-      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      this.localJobDir = localFs.makeQualified(new Path(
+          new Path(conf.getLocalPath(jobDir), user), jobid.toString()));
       this.localJobFile = new Path(this.localJobDir, id + ".xml");
 
       // Manage the distributed cache.  If there are files to be copied,
@@ -182,10 +190,14 @@ public class LocalJobRunner implements C
       this.start();
     }
 
+    protected abstract class RunnableWithThrowable implements Runnable {
+      public volatile Throwable storedException;
+    }
+
     /**
      * A Runnable instance that handles a map task to be run by an executor.
      */
-    protected class MapTaskRunnable implements Runnable {
+    protected class MapTaskRunnable extends RunnableWithThrowable {
       private final int taskId;
       private final TaskSplitMetaInfo info;
       private final JobID jobId;
@@ -196,8 +208,6 @@ public class LocalJobRunner implements C
       // where to fetch mapper outputs.
       private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
 
-      public volatile Throwable storedException;
-
       public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
           Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
         this.info = info;
@@ -217,7 +227,7 @@ public class LocalJobRunner implements C
             info.getSplitIndex(), 1);
           map.setUser(UserGroupInformation.getCurrentUser().
               getShortUserName());
-          setupChildMapredLocalDirs(map, localConf);
+          setupChildMapredLocalDirs(localJobDir, map, localConf);
 
           MapOutputFile mapOutput = new MROutputFiles();
           mapOutput.setConf(localConf);
@@ -251,12 +261,13 @@ public class LocalJobRunner implements C
      * @param mapOutputFiles a mapping from task attempts to output files
      * @return a List of Runnables, one per map task.
      */
-    protected List<MapTaskRunnable> getMapTaskRunnables(
+    protected List<RunnableWithThrowable> getMapTaskRunnables(
         TaskSplitMetaInfo [] taskInfo, JobID jobId,
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
 
       int numTasks = 0;
-      ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+      ArrayList<RunnableWithThrowable> list =
+          new ArrayList<RunnableWithThrowable>();
       for (TaskSplitMetaInfo task : taskInfo) {
         list.add(new MapTaskRunnable(task, numTasks++, jobId,
             mapOutputFiles));
@@ -265,12 +276,89 @@ public class LocalJobRunner implements C
       return list;
     }
 
+    protected class ReduceTaskRunnable extends RunnableWithThrowable {
+      private final int taskId;
+      private final JobID jobId;
+      private final JobConf localConf;
+
+      // This is a reference to a shared object passed in by the
+      // external context; this delivers state to the reducers regarding
+      // where to fetch mapper outputs.
+      private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
+
+      public ReduceTaskRunnable(int taskId, JobID jobId,
+          Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+        this.taskId = taskId;
+        this.jobId = jobId;
+        this.mapOutputFiles = mapOutputFiles;
+        this.localConf = new JobConf(job);
+        this.localConf.set("mapreduce.jobtracker.address", "local");
+      }
+
+      public void run() {
+        try {
+          TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
+              jobId, TaskType.REDUCE, taskId), 0);
+          LOG.info("Starting task: " + reduceId);
+
+          ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
+              reduceId, taskId, mapIds.size(), 1);
+          reduce.setUser(UserGroupInformation.getCurrentUser().
+              getShortUserName());
+          setupChildMapredLocalDirs(localJobDir, reduce, localConf);
+          reduce.setLocalMapFiles(mapOutputFiles);
+
+          if (!Job.this.isInterrupted()) {
+            reduce.setJobFile(localJobFile.toString());
+            localConf.setUser(reduce.getUser());
+            reduce.localizeConfiguration(localConf);
+            reduce.setConf(localConf);
+            try {
+              reduce_tasks.getAndIncrement();
+              myMetrics.launchReduce(reduce.getTaskID());
+              reduce.run(localConf, Job.this);
+              myMetrics.completeReduce(reduce.getTaskID());
+            } finally {
+              reduce_tasks.getAndDecrement();
+            }
+
+            LOG.info("Finishing task: " + reduceId);
+          } else {
+            throw new InterruptedException();
+          }
+        } catch (Throwable t) {
+          // store this to be rethrown in the initial thread context.
+          this.storedException = t;
+        }
+      }
+    }
+
+    /**
+     * Create Runnables to encapsulate reduce tasks for use by the executor
+     * service.
+     * @param jobId the job id
+     * @param mapOutputFiles a mapping from task attempts to output files
+     * @return a List of Runnables, one per reduce task.
+     */
+    protected List<RunnableWithThrowable> getReduceTaskRunnables(
+        JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+
+      int taskId = 0;
+      ArrayList<RunnableWithThrowable> list =
+          new ArrayList<RunnableWithThrowable>();
+      for (int i = 0; i < this.numReduceTasks; i++) {
+        list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
+      }
+
+      return list;
+    }
+
     /**
      * Initialize the counters that will hold partial-progress from
      * the various task attempts.
      * @param numMaps the number of map tasks in this job.
      */
-    private synchronized void initCounters(int numMaps) {
+    private synchronized void initCounters(int numMaps, int numReduces) {
       // Initialize state trackers for all map tasks.
       this.partialMapProgress = new float[numMaps];
       this.mapCounters = new Counters[numMaps];
@@ -278,16 +366,22 @@ public class LocalJobRunner implements C
         this.mapCounters[i] = new Counters();
       }
 
-      this.reduceCounters = new Counters();
+      this.partialReduceProgress = new float[numReduces];
+      this.reduceCounters = new Counters[numReduces];
+      for (int i = 0; i < numReduces; i++) {
+        this.reduceCounters[i] = new Counters();
+      }
+
+      this.numMapTasks = numMaps;
+      this.numReduceTasks = numReduces;
     }
 
     /**
      * Creates the executor service used to run map tasks.
      *
-     * @param numMapTasks the total number of map tasks to be run
      * @return an ExecutorService instance that handles map tasks
      */
-    protected ExecutorService createMapExecutor(int numMapTasks) {
+    protected synchronized ExecutorService createMapExecutor() {
 
       // Determine the size of the thread pool to use
       int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
@@ -295,13 +389,10 @@ public class LocalJobRunner implements C
         throw new IllegalArgumentException(
             "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
       }
-      this.numMapTasks = numMapTasks;
       maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
       maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
 
-      initCounters(this.numMapTasks);
-
-      LOG.debug("Starting thread pool executor.");
+      LOG.debug("Starting mapper thread pool executor.");
       LOG.debug("Max local threads: " + maxMapThreads);
       LOG.debug("Map tasks to process: " + this.numMapTasks);
 
@@ -313,6 +404,65 @@ public class LocalJobRunner implements C
 
       return executor;
     }
+    
+    /**
+     * Creates the executor service used to run reduce tasks.
+     *
+     * @return an ExecutorService instance that handles reduce tasks
+     */
+    protected synchronized ExecutorService createReduceExecutor() {
+
+      // Determine the size of the thread pool to use
+      int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1);
+      if (maxReduceThreads < 1) {
+        throw new IllegalArgumentException(
+            "Configured " + LOCAL_MAX_REDUCES + " must be >= 1");
+      }
+      maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks);
+      maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks.
+
+      LOG.debug("Starting reduce thread pool executor.");
+      LOG.debug("Max local threads: " + maxReduceThreads);
+      LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
+
+      // Create a new executor service to drain the work queue.
+      ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+
+      return executor;
+    }
+
+    /** Run a set of tasks and waits for them to complete. */
+    private void runTasks(List<RunnableWithThrowable> runnables,
+        ExecutorService service, String taskType) throws Exception {
+      // Start populating the executor with work units.
+      // They may begin running immediately (in other threads).
+      for (Runnable r : runnables) {
+        service.submit(r);
+      }
+
+      try {
+        service.shutdown(); // Instructs queue to drain.
+
+        // Wait for tasks to finish; do not use a time-based timeout.
+        // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+        LOG.info("Waiting for " + taskType + " tasks");
+        service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+      } catch (InterruptedException ie) {
+        // Cancel all threads.
+        service.shutdownNow();
+        throw ie;
+      }
+
+      LOG.info(taskType + " task executor complete.");
+
+      // After waiting for the tasks to complete, if any of these
+      // have thrown an exception, rethrow it now in the main thread context.
+      for (RunnableWithThrowable r : runnables) {
+        if (r.storedException != null) {
+          throw new Exception(r.storedException);
+        }
+      }
+    }
 
     private org.apache.hadoop.mapreduce.OutputCommitter 
     createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
@@ -358,94 +508,25 @@ public class LocalJobRunner implements C
           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
 
         int numReduceTasks = job.getNumReduceTasks();
-        if (numReduceTasks > 1 || numReduceTasks < 0) {
-          // we only allow 0 or 1 reducer in local mode
-          numReduceTasks = 1;
-          job.setNumReduceTasks(1);
-        }
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
 
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
             Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
-
-        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
-            jobId, mapOutputFiles);
-        ExecutorService mapService = createMapExecutor(taskRunnables.size());
-
-        // Start populating the executor with work units.
-        // They may begin running immediately (in other threads).
-        for (Runnable r : taskRunnables) {
-          mapService.submit(r);
-        }
+        
+        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
+            taskSplitMetaInfos, jobId, mapOutputFiles);
+              
+        initCounters(mapRunnables.size(), numReduceTasks);
+        ExecutorService mapService = createMapExecutor();
+        runTasks(mapRunnables, mapService, "map");
 
         try {
-          mapService.shutdown(); // Instructs queue to drain.
-
-          // Wait for tasks to finish; do not use a time-based timeout.
-          // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
-          LOG.info("Waiting for map tasks");
-          mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-        } catch (InterruptedException ie) {
-          // Cancel all threads.
-          mapService.shutdownNow();
-          throw ie;
-        }
-
-        LOG.info("Map task executor complete.");
-
-        // After waiting for the map tasks to complete, if any of these
-        // have thrown an exception, rethrow it now in the main thread context.
-        for (MapTaskRunnable r : taskRunnables) {
-          if (r.storedException != null) {
-            throw new Exception(r.storedException);
-          }
-        }
-
-        TaskAttemptID reduceId =
-          new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
-        try {
           if (numReduceTasks > 0) {
-            ReduceTask reduce = new ReduceTask(systemJobFile.toString(), 
-                reduceId, 0, mapIds.size(), 1);
-            reduce.setUser(UserGroupInformation.getCurrentUser().
-                getShortUserName());
-            JobConf localConf = new JobConf(job);
-            localConf.set("mapreduce.jobtracker.address", "local");
-            setupChildMapredLocalDirs(reduce, localConf);
-            // move map output to reduce input  
-            for (int i = 0; i < mapIds.size(); i++) {
-              if (!this.isInterrupted()) {
-                TaskAttemptID mapId = mapIds.get(i);
-                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
-                MapOutputFile localOutputFile = new MROutputFiles();
-                localOutputFile.setConf(localConf);
-                Path reduceIn =
-                  localOutputFile.getInputFileForWrite(mapId.getTaskID(),
-                        localFs.getFileStatus(mapOut).getLen());
-                if (!localFs.mkdirs(reduceIn.getParent())) {
-                  throw new IOException("Mkdirs failed to create "
-                      + reduceIn.getParent().toString());
-                }
-                if (!localFs.rename(mapOut, reduceIn))
-                  throw new IOException("Couldn't rename " + mapOut);
-              } else {
-                throw new InterruptedException();
-              }
-            }
-            if (!this.isInterrupted()) {
-              reduce.setJobFile(localJobFile.toString());
-              localConf.setUser(reduce.getUser());
-              reduce.localizeConfiguration(localConf);
-              reduce.setConf(localConf);
-              reduce_tasks += 1;
-              myMetrics.launchReduce(reduce.getTaskID());
-              reduce.run(localConf, this);
-              myMetrics.completeReduce(reduce.getTaskID());
-              reduce_tasks -= 1;
-            } else {
-              throw new InterruptedException();
-            }
+            List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
+                jobId, mapOutputFiles);
+            ExecutorService reduceService = createReduceExecutor();
+            runTasks(reduceRunnables, reduceService, "reduce");
           }
         } finally {
           for (MapOutputFile output : mapOutputFiles.values()) {
@@ -463,7 +544,6 @@ public class LocalJobRunner implements C
         }
 
         JobEndNotifier.localRunnerNotification(job, status);
-
       } catch (Throwable t) {
         try {
           outputCommitter.abortJob(jContext, 
@@ -509,12 +589,13 @@ public class LocalJobRunner implements C
           new ByteArrayInputStream(baos.toByteArray())));
       
       LOG.info(taskStatus.getStateString());
-      int taskIndex = mapIds.indexOf(taskId);
-      if (taskIndex >= 0) {                       // mapping
+      int mapTaskIndex = mapIds.indexOf(taskId);
+      if (mapTaskIndex >= 0) {
+        // mapping
         float numTasks = (float) this.numMapTasks;
 
-        partialMapProgress[taskIndex] = taskStatus.getProgress();
-        mapCounters[taskIndex] = taskStatus.getCounters();
+        partialMapProgress[mapTaskIndex] = taskStatus.getProgress();
+        mapCounters[mapTaskIndex] = taskStatus.getCounters();
 
         float partialProgress = 0.0f;
         for (float f : partialMapProgress) {
@@ -522,8 +603,18 @@ public class LocalJobRunner implements C
         }
         status.setMapProgress(partialProgress / numTasks);
       } else {
-        reduceCounters = taskStatus.getCounters();
-        status.setReduceProgress(taskStatus.getProgress());
+        // reducing
+        int reduceTaskIndex = taskId.getTaskID().getId();
+        float numTasks = (float) this.numReduceTasks;
+
+        partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress();
+        reduceCounters[reduceTaskIndex] = taskStatus.getCounters();
+
+        float partialProgress = 0.0f;
+        for (float f : partialReduceProgress) {
+          partialProgress += f;
+        }
+        status.setReduceProgress(partialProgress / numTasks);
       }
 
       // ignore phase
@@ -543,7 +634,13 @@ public class LocalJobRunner implements C
       for (Counters c : mapCounters) {
         current = Counters.sum(current, c);
       }
-      current = Counters.sum(current, reduceCounters);
+
+      if (null != reduceCounters && reduceCounters.length > 0) {
+        for (Counters c : reduceCounters) {
+          current = Counters.sum(current, c);
+        }
+      }
+
       return current;
     }
 
@@ -682,8 +779,9 @@ public class LocalJobRunner implements C
   
   public ClusterMetrics getClusterMetrics() {
     int numMapTasks = map_tasks.get();
-    return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
-        reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
+    int numReduceTasks = reduce_tasks.get();
+    return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
+        numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
   }
 
   public JobTrackerStatus getJobTrackerStatus() {
@@ -814,6 +912,27 @@ public class LocalJobRunner implements C
     return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
   }
 
+
+  /**
+   * Set the max number of reduce tasks to run concurrently in the LocalJobRunner.
+   * @param job the job to configure
+   * @param maxReduces the maximum number of reduce tasks to allow.
+   */
+  public static void setLocalMaxRunningReduces(
+      org.apache.hadoop.mapreduce.JobContext job,
+      int maxReduces) {
+    job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces);
+  }
+
+  /**
+   * @return the max number of reduce tasks to run concurrently in the
+   * LocalJobRunner.
+   */
+  public static int getLocalMaxRunningReduces(
+      org.apache.hadoop.mapreduce.JobContext job) {
+    return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1);
+  }
+
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                        ) throws IOException,
@@ -839,31 +958,27 @@ public class LocalJobRunner implements C
     throw new UnsupportedOperationException("Not supported");
   }
   
-  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+  static void setupChildMapredLocalDirs(Path localJobDir, Task t, JobConf conf) {
     String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
-    String jobId = t.getJobID().toString();
     String taskId = t.getTaskID().toString();
     boolean isCleanup = t.isTaskCleanupTask();
-    String user = t.getUser();
     StringBuffer childMapredLocalDir =
         new StringBuffer(localDirs[0] + Path.SEPARATOR
-            + getLocalTaskDir(user, jobId, taskId, isCleanup));
+            + getLocalTaskDir(localJobDir, taskId, isCleanup));
     for (int i = 1; i < localDirs.length; i++) {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
-          + getLocalTaskDir(user, jobId, taskId, isCleanup));
+          + getLocalTaskDir(localJobDir, taskId, isCleanup));
     }
     LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
     conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
   }
   
   static final String TASK_CLEANUP_SUFFIX = ".cleanup";
-  static final String SUBDIR = jobDir;
   static final String JOBCACHE = "jobcache";
   
-  static String getLocalTaskDir(String user, String jobid, String taskid,
+  static String getLocalTaskDir(Path localJobDir, String taskid,
       boolean isCleanupAttempt) {
-    String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
-      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
+    String taskDir = localJobDir.toString() + Path.SEPARATOR + taskid;
     if (isCleanupAttempt) {
       taskDir = taskDir + TASK_CLEANUP_SUFFIX;
     }

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Tue Aug 13 21:19:53 2013
@@ -81,6 +81,8 @@ public class BackupStore<K,V> {
   private boolean inReset = false;
   private boolean clearMarkFlag = false;
   private boolean lastSegmentEOF = false;
+  
+  private Configuration conf;
 
   public BackupStore(Configuration conf, TaskAttemptID taskid)
   throws IOException {
@@ -106,6 +108,8 @@ public class BackupStore<K,V> {
     fileCache = new FileCache(conf);
     tid = taskid;
     
+    this.conf = conf;
+    
     LOG.info("Created a new BackupStore with a memory of " + maxSize);
 
   }
@@ -500,7 +504,7 @@ public class BackupStore<K,V> {
       Reader<K, V> reader = 
         new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
             (org.apache.hadoop.mapred.TaskAttemptID) tid, 
-            dataOut.getData(), 0, usedSize);
+            dataOut.getData(), 0, usedSize, conf);
       Segment<K, V> segment = new Segment<K, V>(reader, false);
       segmentList.add(segment);
       LOG.debug("Added Memory Segment to List. List Size is " + 

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Tue Aug 13 21:19:53 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
@@ -1860,7 +1861,6 @@ public class MapTask extends Task {
       }
       {
         sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
-        Merger.considerFinalMergeForProgress();
         
         IndexRecord rec = new IndexRecord();
         final SpillRecord spillRec = new SpillRecord(partitions);
@@ -1893,7 +1893,8 @@ public class MapTask extends Task {
                          segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,
-                         null, spilledRecordsCounter, sortPhase.phase());
+                         null, spilledRecordsCounter, sortPhase.phase(),
+                         TaskType.MAP);
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Aug 13 21:19:53 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -69,7 +70,8 @@ public class Merger {  
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, null).merge(keyClass, valueClass,
+                           reporter, null,
+                           TaskType.REDUCE).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter, 
                                            mergePhase);
@@ -90,7 +92,8 @@ public class Merger {  
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, mergedMapOutputsCounter).merge(
+                           reporter, mergedMapOutputsCounter,
+                           TaskType.REDUCE).merge(
                                            keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter,
@@ -124,7 +127,8 @@ public class Merger {  
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments,
+                           TaskType.REDUCE).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -140,10 +144,12 @@ public class Merger {  
                             boolean sortSegments,
                             Counters.Counter readsCounter,
                             Counters.Counter writesCounter,
-                            Progress mergePhase)
+                            Progress mergePhase,
+                            TaskType taskType)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments, codec).merge(keyClass, valueClass,
+                           sortSegments, codec,
+                           taskType).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -161,7 +167,8 @@ public class Merger {  
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments,
+                           TaskType.REDUCE).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
                                                tmpDir,
                                                readsCounter, writesCounter,
@@ -182,7 +189,8 @@ public class Merger {  
                           Progress mergePhase)
     throws IOException {
   return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec).merge(keyClass, valueClass,
+                         sortSegments, codec,
+                         TaskType.REDUCE).merge(keyClass, valueClass,
                                              mergeFactor, inMemSegments,
                                              tmpDir,
                                              readsCounter, writesCounter,
@@ -366,20 +374,7 @@ public class Merger {  
       }
     }
   }
-  
-  // Boolean variable for including/considering final merge as part of sort
-  // phase or not. This is true in map task, false in reduce task. It is
-  // used in calculating mergeProgress.
-  static boolean includeFinalMerge = false;
-  
-  /**
-   * Sets the boolean variable includeFinalMerge to true. Called from
-   * map task before calling merge() so that final merge of map task
-   * is also considered as part of sort phase.
-   */
-  static void considerFinalMergeForProgress() {
-    includeFinalMerge = true;
-  }
+
   
   private static class MergeQueue<K extends Object, V extends Object> 
   extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
@@ -401,6 +396,21 @@ public class Merger {  
     final DataInputBuffer value = new DataInputBuffer();
     final DataInputBuffer diskIFileValue = new DataInputBuffer();
     
+    
+    // Boolean variable for including/considering final merge as part of sort
+    // phase or not. This is true in map task, false in reduce task. It is
+    // used in calculating mergeProgress.
+    private boolean includeFinalMerge = false;
+    
+    /**
+     * Sets the boolean variable includeFinalMerge to true. Called from
+     * map task before calling merge() so that final merge of map task
+     * is also considered as part of sort phase.
+     */
+    private void considerFinalMergeForProgress() {
+      includeFinalMerge = true;
+    }    
+    
     Segment<K, V> minSegment;
     Comparator<Segment<K, V>> segmentComparator =   
       new Comparator<Segment<K, V>>() {
@@ -419,14 +429,16 @@ public class Merger {  
                       CompressionCodec codec, RawComparator<K> comparator,
                       Progressable reporter) 
     throws IOException {
-      this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
+      this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null,
+          TaskType.REDUCE);
     }
     
     public MergeQueue(Configuration conf, FileSystem fs, 
                       Path[] inputs, boolean deleteInputs, 
                       CompressionCodec codec, RawComparator<K> comparator,
                       Progressable reporter, 
-                      Counters.Counter mergedMapOutputsCounter) 
+                      Counters.Counter mergedMapOutputsCounter,
+                      TaskType taskType) 
     throws IOException {
       this.conf = conf;
       this.fs = fs;
@@ -434,6 +446,10 @@ public class Merger {  
       this.comparator = comparator;
       this.reporter = reporter;
       
+      if (taskType == TaskType.MAP) {
+        considerFinalMergeForProgress();
+      }
+      
       for (Path file : inputs) {
         LOG.debug("MergeQ: adding: " + file);
         segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs, 
@@ -449,17 +465,20 @@ public class Merger {  
     public MergeQueue(Configuration conf, FileSystem fs,
         List<Segment<K, V>> segments, RawComparator<K> comparator,
         Progressable reporter) {
-      this(conf, fs, segments, comparator, reporter, false);
+      this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE);
     }
 
     public MergeQueue(Configuration conf, FileSystem fs, 
         List<Segment<K, V>> segments, RawComparator<K> comparator,
-        Progressable reporter, boolean sortSegments) {
+        Progressable reporter, boolean sortSegments, TaskType taskType) {
       this.conf = conf;
       this.fs = fs;
       this.comparator = comparator;
       this.segments = segments;
       this.reporter = reporter;
+      if (taskType == TaskType.MAP) {
+        considerFinalMergeForProgress();
+      }
       if (sortSegments) {
         Collections.sort(segments, segmentComparator);
       }
@@ -467,8 +486,10 @@ public class Merger {  
 
     public MergeQueue(Configuration conf, FileSystem fs,
         List<Segment<K, V>> segments, RawComparator<K> comparator,
-        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
-      this(conf, fs, segments, comparator, reporter, sortSegments);
+        Progressable reporter, boolean sortSegments, CompressionCodec codec,
+        TaskType taskType) {
+      this(conf, fs, segments, comparator, reporter, sortSegments,
+          taskType);
       this.codec = codec;
     }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Tue Aug 13 21:19:53 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -74,6 +75,10 @@ public class ReduceTask extends Task {
 
   private CompressionCodec codec;
 
+  // If this is a LocalJobRunner-based job, this will
+  // be a mapping from map task attempts to their output files.
+  // This will be null in other cases.
+  private Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
   { 
     getProgress().setStatus("reduce"); 
@@ -105,24 +110,24 @@ public class ReduceTask extends Task {
   // file paths, the first parameter is considered smaller than the second one.
   // In case of files with same size and path are considered equal.
   private Comparator<FileStatus> mapOutputFileComparator = 
-    new Comparator<FileStatus>() {
-      public int compare(FileStatus a, FileStatus b) {
-        if (a.getLen() < b.getLen())
-          return -1;
-        else if (a.getLen() == b.getLen())
-          if (a.getPath().toString().equals(b.getPath().toString()))
-            return 0;
-          else
-            return -1; 
+      new Comparator<FileStatus>() {
+    public int compare(FileStatus a, FileStatus b) {
+      if (a.getLen() < b.getLen())
+        return -1;
+      else if (a.getLen() == b.getLen())
+        if (a.getPath().toString().equals(b.getPath().toString()))
+          return 0;
         else
-          return 1;
-      }
+          return -1; 
+      else
+        return 1;
+    }
   };
-  
+
   // A sorted set for keeping a set of map output files on disk
   private final SortedSet<FileStatus> mapOutputFilesOnDisk = 
-    new TreeSet<FileStatus>(mapOutputFileComparator);
-
+      new TreeSet<FileStatus>(mapOutputFileComparator);
+  
   public ReduceTask() {
     super();
   }
@@ -133,6 +138,17 @@ public class ReduceTask extends Task {
     this.numMaps = numMaps;
   }
   
+
+  /**
+   * Register the set of mapper outputs created by a LocalJobRunner-based
+   * job with this ReduceTask so it knows where to fetch from.
+   *
+   * This should not be called in normal (networked) execution.
+   */
+  public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> mapFiles) {
+    this.localMapFiles = mapFiles;
+  }
+
   private CompressionCodec initCodec() {
     // check if map-outputs are to be compressed
     if (conf.getCompressMapOutput()) {
@@ -174,20 +190,11 @@ public class ReduceTask extends Task {
     numMaps = in.readInt();
   }
   
-  // Get the input files for the reducer.
-  private Path[] getMapFiles(FileSystem fs, boolean isLocal) 
-  throws IOException {
+  // Get the input files for the reducer (for local jobs).
+  private Path[] getMapFiles(FileSystem fs) throws IOException {
     List<Path> fileList = new ArrayList<Path>();
-    if (isLocal) {
-      // for local jobs
-      for(int i = 0; i < numMaps; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i));
-      }
-    } else {
-      // for non local jobs
-      for (FileStatus filestatus : mapOutputFilesOnDisk) {
-        fileList.add(filestatus.getPath());
-      }
+    for(int i = 0; i < numMaps; ++i) {
+      fileList.add(mapOutputFile.getInputFile(i));
     }
     return fileList.toArray(new Path[0]);
   }
@@ -341,56 +348,33 @@ public class ReduceTask extends Task {
     // Initialize the codec
     codec = initCodec();
     RawKeyValueIterator rIter = null;
-    ShuffleConsumerPlugin shuffleConsumerPlugin = null; 
-    
-    boolean isLocal = false; 
-    // local if
-    // 1) framework == local or
-    // 2) framework == null and job tracker address == local
-    String framework = job.get(MRConfig.FRAMEWORK_NAME);
-    String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local");
-    if ((framework == null && masterAddr.equals("local"))
-        || (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) {
-      isLocal = true;
-    }
+    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
     
-    if (!isLocal) {
-      Class combinerClass = conf.getCombinerClass();
-      CombineOutputCollector combineCollector = 
-        (null != combinerClass) ? 
- 	     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
-
-      Class<? extends ShuffleConsumerPlugin> clazz =
-            job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
-						
-      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
-      LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
-
-      ShuffleConsumerPlugin.Context shuffleContext = 
-        new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
-                    super.lDirAlloc, reporter, codec, 
-                    combinerClass, combineCollector, 
-                    spilledRecordsCounter, reduceCombineInputCounter,
-                    shuffledMapsCounter,
-                    reduceShuffleBytes, failedShuffleCounter,
-                    mergedMapOutputsCounter,
-                    taskStatus, copyPhase, sortPhase, this,
-                    mapOutputFile);
-      shuffleConsumerPlugin.init(shuffleContext);
-      rIter = shuffleConsumerPlugin.run();
-    } else {
-      // local job runner doesn't have a copy phase
-      copyPhase.complete();
-      final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-      rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
-                           job.getMapOutputValueClass(), codec, 
-                           getMapFiles(rfs, true),
-                           !conf.getKeepFailedTaskFiles(), 
-                           job.getInt(JobContext.IO_SORT_FACTOR, 100),
-                           new Path(getTaskID().toString()), 
-                           job.getOutputKeyComparator(),
-                           reporter, spilledRecordsCounter, null, null);
-    }
+    Class combinerClass = conf.getCombinerClass();
+    CombineOutputCollector combineCollector = 
+      (null != combinerClass) ? 
+     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
+
+    Class<? extends ShuffleConsumerPlugin> clazz =
+          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+					
+    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+    ShuffleConsumerPlugin.Context shuffleContext = 
+      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
+                  super.lDirAlloc, reporter, codec, 
+                  combinerClass, combineCollector, 
+                  spilledRecordsCounter, reduceCombineInputCounter,
+                  shuffledMapsCounter,
+                  reduceShuffleBytes, failedShuffleCounter,
+                  mergedMapOutputsCounter,
+                  taskStatus, copyPhase, sortPhase, this,
+                  mapOutputFile, localMapFiles);
+    shuffleConsumerPlugin.init(shuffleContext);
+
+    rIter = shuffleConsumerPlugin.run();
+
     // free up the data structures
     mapOutputFilesOnDisk.clear();
     
@@ -409,9 +393,7 @@ public class ReduceTask extends Task {
                     keyClass, valueClass);
     }
 
-    if (shuffleConsumerPlugin != null) {
-      shuffleConsumerPlugin.close();
-    }
+    shuffleConsumerPlugin.close();
     done(umbilical, reporter);
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Tue Aug 13 21:19:53 2013
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Map;
+
 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -65,6 +67,7 @@ public interface ShuffleConsumerPlugin<K
     private final Progress mergePhase;
     private final Task reduceTask;
     private final MapOutputFile mapOutputFile;
+    private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
     public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
                    JobConf jobConf, FileSystem localFS,
@@ -80,7 +83,8 @@ public interface ShuffleConsumerPlugin<K
                    Counters.Counter failedShuffleCounter,
                    Counters.Counter mergedMapOutputsCounter,
                    TaskStatus status, Progress copyPhase, Progress mergePhase,
-                   Task reduceTask, MapOutputFile mapOutputFile) {
+                   Task reduceTask, MapOutputFile mapOutputFile,
+                   Map<TaskAttemptID, MapOutputFile> localMapFiles) {
       this.reduceId = reduceId;
       this.jobConf = jobConf;
       this.localFS = localFS;
@@ -101,6 +105,7 @@ public interface ShuffleConsumerPlugin<K
       this.mergePhase = mergePhase;
       this.reduceTask = reduceTask;
       this.mapOutputFile = mapOutputFile;
+      this.localMapFiles = localMapFiles;
     }
 
     public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
@@ -163,6 +168,9 @@ public interface ShuffleConsumerPlugin<K
     public MapOutputFile getMapOutputFile() {
       return mapOutputFile;
     }
+    public Map<TaskAttemptID, MapOutputFile> getLocalMapFiles() {
+      return localMapFiles;
+    }
   } // end of public static class Context<K,V>
 
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Aug 13 21:19:53 2013
@@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
   /* Default read timeout (in milliseconds) */
   private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
-  private final Reporter reporter;
+  protected final Reporter reporter;
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -71,13 +71,13 @@ class Fetcher<K,V> extends Thread {
   private final Counters.Counter badIdErrs;
   private final Counters.Counter wrongMapErrs;
   private final Counters.Counter wrongReduceErrs;
-  private final MergeManager<K,V> merger;
-  private final ShuffleSchedulerImpl<K,V> scheduler;
-  private final ShuffleClientMetrics metrics;
-  private final ExceptionReporter exceptionReporter;
-  private final int id;
+  protected final MergeManager<K,V> merger;
+  protected final ShuffleSchedulerImpl<K,V> scheduler;
+  protected final ShuffleClientMetrics metrics;
+  protected final ExceptionReporter exceptionReporter;
+  protected final int id;
   private static int nextId = 0;
-  private final int reduce;
+  protected final int reduce;
   
   private final int connectionTimeout;
   private final int readTimeout;

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java Tue Aug 13 21:19:53 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -39,11 +40,11 @@ public class InMemoryReader<K, V> extend
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
-
+  
   public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
-                        byte[] data, int start, int length)
+                        byte[] data, int start, int length, Configuration conf)
   throws IOException {
-    super(null, null, length - start, null, null);
+    super(conf, null, length - start, null, null);
     this.merger = merger;
     this.taskAttemptId = taskAttemptId;
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Aug 13 21:19:53 2013
@@ -613,7 +613,7 @@ public class MergeManagerImpl<K, V> impl
       fullSize -= size;
       Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this, 
                                                    mo.getMapId(),
-                                                   data, 0, (int)size);
+                                                   data, 0, (int)size, jobConf);
       inMemorySegments.add(new Segment<K,V>(reader, true, 
                                             (mo.isPrimaryMapOutput() ? 
                                             mergedMapOutputsCounter : null)));

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Aug 13 21:19:53 2013
@@ -18,10 +18,12 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Task;
@@ -56,6 +58,7 @@ public class Shuffle<K, V> implements Sh
   private Progress copyPhase;
   private TaskStatus taskStatus;
   private Task reduceTask; //Used for status updates
+  private Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
   @Override
   public void init(ShuffleConsumerPlugin.Context context) {
@@ -69,6 +72,7 @@ public class Shuffle<K, V> implements Sh
     this.copyPhase = context.getCopyPhase();
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
+    this.localMapFiles = context.getLocalMapFiles();
     
     scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
         this, copyPhase, context.getShuffledMapsCounter(),
@@ -103,13 +107,22 @@ public class Shuffle<K, V> implements Sh
     eventFetcher.start();
     
     // Start the map-output fetcher threads
-    final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+    boolean isLocal = localMapFiles != null;
+    final int numFetchers = isLocal ? 1 :
+      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
     Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
-    for (int i=0; i < numFetchers; ++i) {
-      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
-                                     reporter, metrics, this, 
-                                     reduceTask.getShuffleSecret());
-      fetchers[i].start();
+    if (isLocal) {
+      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
+          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
+          localMapFiles);
+      fetchers[0].start();
+    } else {
+      for (int i=0; i < numFetchers; ++i) {
+        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
+                                       reporter, metrics, this, 
+                                       reduceTask.getShuffleSecret());
+        fetchers[i].start();
+      }
     }
     
     // Wait for shuffle to complete successfully

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Tue Aug 13 21:19:53 2013
@@ -155,7 +155,7 @@ public class TestShufflePlugin<K, V> {
                                                 mockCounter, mockCounter, mockCounter,
                                                 mockCounter, mockCounter, mockCounter,
                                                 mockTaskStatus, mockProgress, mockProgress,
-                                                mockTask, mockMapOutputFile);
+                                                mockTask, mockMapOutputFile, null);
       shuffleConsumerPlugin.init(context);
       shuffleConsumerPlugin.run();
       shuffleConsumerPlugin.close();

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Tue Aug 13 21:19:53 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
@@ -139,11 +140,13 @@ public class JobHistoryServer extends Co
     return this.clientService;
   }
 
-  public static void main(String[] args) {
-    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+  static JobHistoryServer launchJobHistoryServer(String[] args) {
+    Thread.
+        setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
+    JobHistoryServer jobHistoryServer = null;
     try {
-      JobHistoryServer jobHistoryServer = new JobHistoryServer();
+      jobHistoryServer = new JobHistoryServer();
       ShutdownHookManager.get().addShutdownHook(
           new CompositeServiceShutdownHook(jobHistoryServer),
           SHUTDOWN_HOOK_PRIORITY);
@@ -152,7 +155,12 @@ public class JobHistoryServer extends Co
       jobHistoryServer.start();
     } catch (Throwable t) {
       LOG.fatal("Error starting JobHistoryServer", t);
-      System.exit(-1);
+      ExitUtil.terminate(-1, "Error starting JobHistoryServer");
     }
+    return jobHistoryServer;
+  }
+
+  public static void main(String[] args) {
+    launchJobHistoryServer(args);
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java Tue Aug 13 21:19:53 2013
@@ -63,14 +63,11 @@ public class TestJobHistoryServer {
   private static RecordFactory recordFactory = RecordFactoryProvider
           .getRecordFactory(null);
 
-
-  
   JobHistoryServer historyServer=null;
+
   // simple test init/start/stop   JobHistoryServer. Status should change.
-  
   @Test (timeout= 50000 )
   public void testStartStopServer() throws Exception {
-
     historyServer = new JobHistoryServer();
     Configuration config = new Configuration();
     historyServer.init(config);
@@ -86,15 +83,9 @@ public class TestJobHistoryServer {
     historyServer.stop();
     assertEquals(STATE.STOPPED, historyServer.getServiceState());
     assertNotNull(historyService.getClientHandler().getConnectAddress());
-
-    
-    
   }
 
-
-
   //Test reports of  JobHistoryServer. History server should get log files from  MRApp and read them
-  
   @Test (timeout= 50000 )
   public void testReports() throws Exception {
     Configuration config = new Configuration();
@@ -128,7 +119,6 @@ public class TestJobHistoryServer {
     assertEquals(1, jobs.size());
     assertEquals("job_0_0000",jobs.keySet().iterator().next().toString());
     
-    
     Task task = job.getTasks().values().iterator().next();
     TaskAttempt attempt = task.getAttempts().values().iterator().next();
 
@@ -188,14 +178,14 @@ public class TestJobHistoryServer {
     assertEquals("", diagnosticResponse.getDiagnostics(0));
 
   }
- // test main method
+
+  // test launch method
   @Test (timeout =60000)
-  public void testMainMethod() throws Exception {
+  public void testLaunch() throws Exception {
 
     ExitUtil.disableSystemExit();
     try {
-      JobHistoryServer.main(new String[0]);
-
+      historyServer = JobHistoryServer.launchJobHistoryServer(new String[0]);
     } catch (ExitUtil.ExitException e) {
       assertEquals(0,e.status);
       ExitUtil.resetFirstExitException();

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java Tue Aug 13 21:19:53 2013
@@ -276,18 +276,16 @@ public class TestJobCounters {
     // there are too few spills to combine (2 < 3)
     // Each map spills 2^14 records, so maps spill 49152 records, combined.
 
-    // The reduce spill count is composed of the read from one segment and
-    // the intermediate merge of the other two. The intermediate merge
+    // The combiner has emitted 24576 records to the reducer; these are all
+    // fetched straight to memory from the map side. The intermediate merge
     // adds 8192 records per segment read; again, there are too few spills to
-    // combine, so all 16834 are written to disk (total 32768 spilled records
-    // for the intermediate merge). The merge into the reduce includes only
-    // the unmerged segment, size 8192. Total spilled records in the reduce
-    // is 32768 from the merge + 8192 unmerged segment = 40960 records
+    // combine, so all Total spilled records in the reduce
+    // is 8192 records / map * 3 maps = 24576.
 
-    // Total: map + reduce = 49152 + 40960 = 90112
+    // Total: map + reduce = 49152 + 24576 = 73728
     // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 61440 output records
-    validateCounters(c1, 90112, 15360, 61440);
+    validateCounters(c1, 73728, 15360, 61440);
     validateFileCounters(c1, inputSize, 0, 0, 0);
     validateOldFileCounters(c1, inputSize, 61928, 0, 0);
   }
@@ -316,12 +314,12 @@ public class TestJobCounters {
     // 1st merge: read + write = 8192 * 4
     // 2nd merge: read + write = 8192 * 4
     // final merge: 0
-    // Total reduce: 65536
+    // Total reduce: 32768
 
-    // Total: map + reduce = 2^16 + 2^16 = 131072
+    // Total: map + reduce = 2^16 + 2^15 = 98304
     // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 81920 output records
-    validateCounters(c1, 131072, 20480, 81920);
+    validateCounters(c1, 98304, 20480, 81920);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -349,7 +347,7 @@ public class TestJobCounters {
     // Total reduce: 45056
     // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 102400 output records
-    validateCounters(c1, 147456, 25600, 102400);
+    validateCounters(c1, 122880, 25600, 102400);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -394,7 +392,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN0"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 90112, 15360, 61440);
+    validateCounters(c1, 73728, 15360, 61440);
     validateFileCounters(c1, inputSize, 0, 0, 0);    
   }
 
@@ -416,7 +414,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN1"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 131072, 20480, 81920);
+    validateCounters(c1, 98304, 20480, 81920);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -439,7 +437,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN2"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 147456, 25600, 102400);
+    validateCounters(c1, 122880, 25600, 102400);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Tue Aug 13 21:19:53 2013
@@ -63,7 +63,7 @@ public class TestKeyFieldBasedComparator
     conf.setOutputValueClass(LongWritable.class);
 
     conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(2);
+    conf.setNumReduceTasks(1);
 
     conf.setOutputFormat(TextOutputFormat.class);
     conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
@@ -101,9 +101,7 @@ public class TestKeyFieldBasedComparator
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
       String line = reader.readLine();
       //make sure we get what we expect as the first line, and also
-      //that we have two lines (both the lines must end up in the same
-      //reducer since the partitioner takes the same key spec for all
-      //lines
+      //that we have two lines
       if (expect == 1) {
         assertTrue(line.startsWith(line1));
       } else if (expect == 2) {

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java Tue Aug 13 21:19:53 2013
@@ -31,9 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.LocalJobRunner;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -410,6 +410,7 @@ public class TestLocalRunner extends Tes
   }
 
   /** Test case for zero mappers */
+  @Test
   public void testEmptyMaps() throws Exception {
     Job job = Job.getInstance();
     Path outputPath = getOutputPath();
@@ -428,5 +429,145 @@ public class TestLocalRunner extends Tes
     boolean success = job.waitForCompletion(true);
     assertTrue("Empty job should work", success);
   }
+
+  /** @return the directory where numberfiles are written (mapper inputs)  */
+  private Path getNumberDirPath() {
+    return new Path(getInputPath(), "numberfiles");
+  }
+
+  /**
+   * Write out an input file containing an integer.
+   *
+   * @param fileNum the file number to write to.
+   * @param value the value to write to the file
+   * @return the path of the written file.
+   */
+  private Path makeNumberFile(int fileNum, int value) throws IOException {
+    Path workDir = getNumberDirPath();
+    Path filePath = new Path(workDir, "file" + fileNum);
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+    w.write("" + value);
+    w.close();
+
+    return filePath;
+  }
+
+  /**
+   * Each record received by this mapper is a number 'n'.
+   * Emit the values [0..n-1]
+   */
+  public static class SequenceMapper
+      extends Mapper<LongWritable, Text, Text, NullWritable> {
+
+    public void map(LongWritable k, Text v, Context c)
+        throws IOException, InterruptedException {
+      int max = Integer.valueOf(v.toString());
+      for (int i = 0; i < max; i++) {
+        c.write(new Text("" + i), NullWritable.get());
+      }
+    }
+  }
+
+  private final static int NUMBER_FILE_VAL = 100;
+
+  /**
+   * Tally up the values and ensure that we got as much data
+   * out as we put in.
+   * Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
+   * Verify that across all our reducers we got exactly this much
+   * data back.
+   */
+  private void verifyNumberJob(int numMaps) throws Exception {
+    Path outputDir = getOutputPath();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    FileStatus [] stats = fs.listStatus(outputDir);
+    int valueSum = 0;
+    for (FileStatus f : stats) {
+      FSDataInputStream istream = fs.open(f.getPath());
+      BufferedReader r = new BufferedReader(new InputStreamReader(istream));
+      String line = null;
+      while ((line = r.readLine()) != null) {
+        valueSum += Integer.valueOf(line.trim());
+      }
+      r.close();
+    }
+
+    int maxVal = NUMBER_FILE_VAL - 1;
+    int expectedPerMapper = maxVal * (maxVal + 1) / 2;
+    int expectedSum = expectedPerMapper * numMaps;
+    LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
+    assertEquals("Didn't get all our results back", expectedSum, valueSum);
+  }
+
+  /**
+   * Run a test which creates a SequenceMapper / IdentityReducer
+   * job over a set of generated number files.
+   */
+  private void doMultiReducerTest(int numMaps, int numReduces,
+      int parallelMaps, int parallelReduces) throws Exception {
+
+    Path in = getNumberDirPath();
+    Path out = getOutputPath();
+
+    // Clear data from any previous tests.
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(out)) {
+      fs.delete(out, true);
+    }
+
+    if (fs.exists(in)) {
+      fs.delete(in, true);
+    }
+
+    for (int i = 0; i < numMaps; i++) {
+      makeNumberFile(i, 100);
+    }
+
+    Job job = Job.getInstance();
+    job.setNumReduceTasks(numReduces);
+
+    job.setMapperClass(SequenceMapper.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NullWritable.class);
+    FileInputFormat.addInputPath(job, in);
+    FileOutputFormat.setOutputPath(job, out);
+
+    LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
+    LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
+
+    boolean result = job.waitForCompletion(true);
+    assertTrue("Job failed!!", result);
+
+    verifyNumberJob(numMaps);
+  }
+  
+  @Test
+  public void testOneMapMultiReduce() throws Exception {
+    doMultiReducerTest(1, 2, 1, 1);
+  }
+
+  @Test
+  public void testOneMapMultiParallelReduce() throws Exception {
+    doMultiReducerTest(1, 2, 1, 2);
+  }
+
+  @Test
+  public void testMultiMapOneReduce() throws Exception {
+    doMultiReducerTest(4, 1, 2, 1);
+  }
+
+  @Test
+  public void testMultiMapMultiReduce() throws Exception {
+    doMultiReducerTest(4, 4, 2, 2);
+  }
+
 }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Tue Aug 13 21:19:53 2013
@@ -56,7 +56,7 @@ public class TestMRKeyFieldBasedComparat
     conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
     conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
 
-    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
                 line1 +"\n" + line2 + "\n"); 
     job.setMapperClass(InverseMapper.class);
     job.setReducerClass(Reducer.class);

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/dancing/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/math/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/package.html
------------------------------------------------------------------------------
    svn:eol-style = native