You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/04/19 18:22:23 UTC
svn commit: r1328031 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...
Author: bobby
Date: Thu Apr 19 16:22:22 2012
New Revision: 1328031
URL: http://svn.apache.org/viewvc?rev=1328031&view=rev
Log:
MAPREDUCE-4159. Job is running in Uber mode after setting "mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1328031&r1=1328030&r2=1328031&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Apr 19 16:22:22 2012
@@ -370,6 +370,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4074. Client continuously retries to RM When RM goes down
before launching Application Master (xieguiming via tgraves)
+ MAPREDUCE-4159. Job is running in Uber mode after setting
+ "mapreduce.job.ubertask.maxreduces" to zero (Devaraj K via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1328031&r1=1328030&r2=1328031&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Apr 19 16:22:22 2012
@@ -822,9 +822,9 @@ public class JobImpl implements org.apac
//FIXME: handling multiple reduces within a single AM does not seem to
//work.
- // int sysMaxReduces =
- // job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
- int sysMaxReduces = 1;
+ int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+ boolean isValidUberMaxReduces = (sysMaxReduces == 0)
+ || (sysMaxReduces == 1);
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from
@@ -856,7 +856,7 @@ public class JobImpl implements org.apac
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution.
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
- && smallInput && smallMemory && notChainJob;
+ && smallInput && smallMemory && notChainJob && isValidUberMaxReduces;
if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
@@ -889,7 +889,9 @@ public class JobImpl implements org.apac
if (!smallMemory)
msg.append(" too much RAM;");
if (!notChainJob)
- msg.append(" chainjob");
+ msg.append(" chainjob;");
+ if (!isValidUberMaxReduces)
+ msg.append(" not supported uber max reduces");
LOG.info(msg.toString());
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1328031&r1=1328030&r2=1328031&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Apr 19 16:22:22 2012
@@ -37,14 +37,20 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -233,4 +239,69 @@ public class TestJobImpl {
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
+ @Test
+ public void testUberDecision() throws Exception {
+
+ // with default values, no of maps is 2
+ Configuration conf = new Configuration();
+ boolean isUber = testUberDecision(conf);
+ Assert.assertFalse(isUber);
+
+ // enable uber mode, no of maps is 2
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ isUber = testUberDecision(conf);
+ Assert.assertTrue(isUber);
+
+ // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+ // reduces is 0
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0);
+ conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ isUber = testUberDecision(conf);
+ Assert.assertFalse(isUber);
+
+ // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+ // reduces is 1
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+ conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ isUber = testUberDecision(conf);
+ Assert.assertTrue(isUber);
+
+ // enable uber mode, no of maps is 2 and uber task max maps is 0
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
+ isUber = testUberDecision(conf);
+ Assert.assertFalse(isUber);
+ }
+
+ private boolean testUberDecision(Configuration conf) {
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+ JobImpl job = new JobImpl(jobId, Records
+ .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
+ null, mock(JobTokenSecretManager.class), null, null, null,
+ mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
+ InitTransition initTransition = getInitTransition();
+ JobEvent mockJobEvent = mock(JobEvent.class);
+ initTransition.transition(job, mockJobEvent);
+ boolean isUber = job.isUber();
+ return isUber;
+ }
+
+ private InitTransition getInitTransition() {
+ InitTransition initTransition = new InitTransition() {
+ @Override
+ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+ return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
+ new TaskSplitMetaInfo() };
+ }
+ };
+ return initTransition;
+ }
}