You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/11/14 09:06:51 UTC

svn commit: r1541846 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/ap...

Author: sandy
Date: Thu Nov 14 08:06:50 2013
New Revision: 1541846

URL: http://svn.apache.org/r1541846
Log:
MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1541846&r1=1541845&r2=1541846&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Nov 14 08:06:50 2013
@@ -39,6 +39,8 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-5624 Move grizzly-test and junit dependencies to test scope
     (Ted Yu via stevel)
 
+    MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1541846&r1=1541845&r2=1541846&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Thu Nov 14 08:06:50 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -35,6 +37,7 @@ import org.apache.hadoop.fs.UnsupportedF
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -168,6 +171,10 @@ public class LocalContainerLauncher exte
     public void run() {
       ContainerLauncherEvent event = null;
 
+      // Collect locations of map outputs to give to reduces
+      Map<TaskAttemptID, MapOutputFile> localMapFiles =
+          new HashMap<TaskAttemptID, MapOutputFile>();
+      
       // _must_ either run subtasks sequentially or accept expense of new JVMs
       // (i.e., fork()), else will get weird failures when maps try to create/
       // write same dirname or filename:  no chdir() in Java
@@ -223,7 +230,7 @@ public class LocalContainerLauncher exte
               context.getEventHandler().handle(jce);
             }
             runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
-                       (numReduceTasks > 0));
+                       (numReduceTasks > 0), localMapFiles);
             
           } catch (RuntimeException re) {
             JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
@@ -265,7 +272,8 @@ public class LocalContainerLauncher exte
                             final TaskType taskType,
                             TaskAttemptId attemptID,
                             final int numMapTasks,
-                            boolean renameOutputs)
+                            boolean renameOutputs,
+                            Map<TaskAttemptID, MapOutputFile> localMapFiles)
     throws RuntimeException, IOException {
       org.apache.hadoop.mapred.TaskAttemptID classicAttemptID =
           TypeConverter.fromYarn(attemptID);
@@ -309,7 +317,9 @@ public class LocalContainerLauncher exte
           map.run(conf, umbilical);
 
           if (renameOutputs) {
-            renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
+            MapOutputFile renamed = renameMapOutputForReduce(conf, attemptID,
+                map.getMapOutputFile());
+            localMapFiles.put(classicAttemptID, renamed);
           }
           relocalize();
 
@@ -335,10 +345,11 @@ public class LocalContainerLauncher exte
           conf.set(MRConfig.MASTER_ADDRESS, "local");  // bypass shuffle
 
           ReduceTask reduce = (ReduceTask)task;
+          reduce.setLocalMapFiles(localMapFiles);
           reduce.setConf(conf);          
 
           reduce.run(conf, umbilical);
-          //relocalize();  // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
+          relocalize();
         }
 
       } catch (FSError e) {
@@ -387,15 +398,16 @@ public class LocalContainerLauncher exte
      * so there are no particular compatibility issues.)
      */
     @SuppressWarnings("deprecation")
-    private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
-                                          MapOutputFile subMapOutputFile)
-    throws IOException {
+    private MapOutputFile renameMapOutputForReduce(JobConf conf,
+        TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
       FileSystem localFs = FileSystem.getLocal(conf);
       // move map output to reduce input
       Path mapOut = subMapOutputFile.getOutputFile();
       FileStatus mStatus = localFs.getFileStatus(mapOut);      
       Path reduceIn = subMapOutputFile.getInputFileForWrite(
           TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+      Path mapOutIndex = new Path(mapOut.toString() + ".index");
+      Path reduceInIndex = new Path(reduceIn.toString() + ".index");
       if (LOG.isDebugEnabled()) {
         LOG.debug("Renaming map output file for task attempt "
             + mapId.toString() + " from original location " + mapOut.toString()
@@ -407,6 +419,10 @@ public class LocalContainerLauncher exte
       }
       if (!localFs.rename(mapOut, reduceIn))
         throw new IOException("Couldn't rename " + mapOut);
+      if (!localFs.rename(mapOutIndex, reduceInIndex))
+        throw new IOException("Couldn't rename " + mapOutIndex);
+      
+      return new RenamedMapOutputFile(reduceIn);
     }
 
     /**
@@ -441,5 +457,70 @@ public class LocalContainerLauncher exte
     }
 
   } // end SubtaskRunner
+  
+  private static class RenamedMapOutputFile extends MapOutputFile {
+    private Path path;
+    
+    public RenamedMapOutputFile(Path path) {
+      this.path = path;
+    }
+    
+    @Override
+    public Path getOutputFile() throws IOException {
+      return path;
+    }
+
+    @Override
+    public Path getOutputFileForWrite(long size) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getOutputFileForWriteInVolume(Path existing) {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getOutputIndexFile() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getOutputIndexFileForWrite(long size) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getOutputIndexFileForWriteInVolume(Path existing) {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getSpillFile(int spillNumber) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getSpillFileForWrite(int spillNumber, long size)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getSpillIndexFile(int spillNumber) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getSpillIndexFileForWrite(int spillNumber, long size)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getInputFile(int mapId) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Path getInputFileForWrite(TaskID mapId, long size)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public void removeAll() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
 
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1541846&r1=1541845&r2=1541846&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Nov 14 08:06:50 2013
@@ -1173,11 +1173,7 @@ public class JobImpl implements org.apac
     // these are no longer "system" settings, necessarily; user may override
     int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
 
-    //FIXME: handling multiple reduces within a single AM does not seem to
-    //work.
     int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
-    boolean isValidUberMaxReduces = (sysMaxReduces == 0)
-        || (sysMaxReduces == 1);
 
     long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
         fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from
@@ -1225,7 +1221,7 @@ public class JobImpl implements org.apac
     // and thus requires sequential execution.
     isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
         && smallInput && smallMemory && smallCpu 
-        && notChainJob && isValidUberMaxReduces;
+        && notChainJob;
 
     if (isUber) {
       LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
@@ -1259,8 +1255,6 @@ public class JobImpl implements org.apac
         msg.append(" too much RAM;");
       if (!notChainJob)
         msg.append(" chainjob;");
-      if (!isValidUberMaxReduces)
-        msg.append(" not supported uber max reduces");
       LOG.info(msg.toString());
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1541846&r1=1541845&r2=1541846&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Thu Nov 14 08:06:50 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,12 +40,14 @@ import org.junit.Test;
 public class TestUberAM extends TestMRJobs {
 
   private static final Log LOG = LogFactory.getLog(TestUberAM.class);
-
+  private int numSleepReducers;
+  
   @BeforeClass
   public static void setup() throws IOException {
     TestMRJobs.setup();
     if (mrCluster != null) {
     	mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    	mrCluster.getConfig().setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 3);
     }
   }
 
@@ -52,8 +55,19 @@ public class TestUberAM extends TestMRJo
   @Test
   public void testSleepJob()
   throws IOException, InterruptedException, ClassNotFoundException {
+    numSleepReducers = 1;
     if (mrCluster != null) {
-    	mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", 1);
+    	mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
+    }
+    super.testSleepJob();
+  }
+  
+  @Test
+  public void testSleepJobWithMultipleReducers()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    numSleepReducers = 3;
+    if (mrCluster != null) {
+      mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
     }
     super.testSleepJob();
   }
@@ -67,7 +81,7 @@ public class TestUberAM extends TestMRJo
         .getValue());
     Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
         .getValue());
-    Assert.assertEquals(1,
+    Assert.assertEquals(numSleepReducers,
         counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
     Assert
         .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
@@ -76,11 +90,11 @@ public class TestUberAM extends TestMRJo
         .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
             && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
 
-    Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
-        .getValue());
-    Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES)
-        .getValue());
-    Assert.assertEquals(4,
+    Assert.assertEquals(3,
+        counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue());
+    Assert.assertEquals(numSleepReducers,
+        counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES).getValue());
+    Assert.assertEquals(3 + numSleepReducers,
         counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
   }
 
@@ -138,8 +152,10 @@ public class TestUberAM extends TestMRJo
 
     TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2);
     Assert.assertEquals(1, events.length);
-    Assert.assertEquals(TaskCompletionEvent.Status.TIPFAILED,
-        events[0].getStatus());
+    // TIPFAILED if it comes from the AM, FAILED if it comes from the JHS
+    TaskCompletionEvent.Status status = events[0].getStatus();
+    Assert.assertTrue(status == TaskCompletionEvent.Status.FAILED ||
+        status == TaskCompletionEvent.Status.TIPFAILED);
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
     
     //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS