You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/04/19 18:22:23 UTC

svn commit: r1328031 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...

Author: bobby
Date: Thu Apr 19 16:22:22 2012
New Revision: 1328031

URL: http://svn.apache.org/viewvc?rev=1328031&view=rev
Log:
MAPREDUCE-4159. Job is running in Uber mode after setting "mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1328031&r1=1328030&r2=1328031&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Apr 19 16:22:22 2012
@@ -370,6 +370,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4074. Client continuously retries to RM When RM goes down 
     before launching Application Master (xieguiming via tgraves)
 
+    MAPREDUCE-4159. Job is running in Uber mode after setting 
+    "mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1328031&r1=1328030&r2=1328031&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Apr 19 16:22:22 2012
@@ -822,9 +822,9 @@ public class JobImpl implements org.apac
 
     //FIXME: handling multiple reduces within a single AM does not seem to
     //work.
-    // int sysMaxReduces =
-    //     job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
-    int sysMaxReduces = 1;
+    int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+    boolean isValidUberMaxReduces = (sysMaxReduces == 0)
+        || (sysMaxReduces == 1);
 
     long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
         fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from
@@ -856,7 +856,7 @@ public class JobImpl implements org.apac
     // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
     // and thus requires sequential execution.
     isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
-        && smallInput && smallMemory && notChainJob;
+        && smallInput && smallMemory && notChainJob && isValidUberMaxReduces;
 
     if (isUber) {
       LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
@@ -889,7 +889,9 @@ public class JobImpl implements org.apac
       if (!smallMemory)
         msg.append(" too much RAM;");
       if (!notChainJob)
-        msg.append(" chainjob");
+        msg.append(" chainjob;");
+      if (!isValidUberMaxReduces)
+        msg.append(" not supported uber max reduces");
       LOG.info(msg.toString());
     }
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1328031&r1=1328030&r2=1328031&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Apr 19 16:22:22 2012
@@ -37,14 +37,20 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -233,4 +239,69 @@ public class TestJobImpl {
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
+  @Test
+  public void testUberDecision() throws Exception {
+
+    // with default values, no of maps is 2
+    Configuration conf = new Configuration();
+    boolean isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+
+    // enable uber mode, no of maps is 2
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
+
+    // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+    // reduces is 0
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+
+    // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+    // reduces is 1
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
+
+    // enable uber mode, no of maps is 2 and uber task max maps is 0
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+  }
+
+  private boolean testUberDecision(Configuration conf) {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null, null, null,
+        mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
+    InitTransition initTransition = getInitTransition();
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    initTransition.transition(job, mockJobEvent);
+    boolean isUber = job.isUber();
+    return isUber;
+  }
+
+  private InitTransition getInitTransition() {
+    InitTransition initTransition = new InitTransition() {
+      @Override
+      protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+        return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
+            new TaskSplitMetaInfo() };
+      }
+    };
+    return initTransition;
+  }
 }