You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/10/10 01:38:54 UTC
svn commit: r703292 - in /hadoop/core/trunk: CHANGES.txt
conf/hadoop-default.xml
src/mapred/org/apache/hadoop/mapred/JobInProgress.java
src/mapred/org/apache/hadoop/mapred/JobTracker.java
src/test/org/apache/hadoop/mapred/TestTaskLimits.java
Author: dhruba
Date: Thu Oct 9 16:38:53 2008
New Revision: 703292
URL: http://svn.apache.org/viewvc?rev=703292&view=rev
Log:
HADOOP-4018. The number of tasks for a single job cannot exceed a
pre-configured maximum value. (dhruba)
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=703292&r1=703291&r2=703292&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Oct 9 16:38:53 2008
@@ -873,6 +873,9 @@
HADOOP-4267. Occasional exceptions during shutting down HSQLDB is logged
but not rethrown. (enis)
+ HADOOP-4018. The number of tasks for a single job cannot exceed a
+ pre-configured maximum value. (dhruba)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=703292&r1=703291&r2=703292&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Oct 9 16:38:53 2008
@@ -968,6 +968,13 @@
</property>
<property>
+ <name>mapred.jobtracker.maxtasks.per.job</name>
+ <value>-1</value>
+ <description>The maximum number of tasks for a single job.
+ A value of -1 indicates that there is no maximum. </description>
+</property>
+
+<property>
<name>mapred.submit.replication</name>
<value>10</value>
<description>The replication level for submitted job files. This
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=703292&r1=703291&r2=703292&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Oct 9 16:38:53 2008
@@ -52,7 +52,7 @@
* ***********************************************************
*/
class JobInProgress {
- private static final Log LOG = LogFactory.getLog(JobInProgress.class);
+ static final Log LOG = LogFactory.getLog(JobInProgress.class);
JobProfile profile;
JobStatus status;
@@ -363,6 +363,18 @@
splitFile.close();
}
numMapTasks = splits.length;
+
+
+ // if the number of splits is larger than a configured value
+ // then fail the job.
+ int maxTasks = jobtracker.getMaxTasksPerJob();
+ if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
+ throw new IOException(
+ "The number of tasks for this job " +
+ (numMapTasks + numReduceTasks) +
+ " exceeds the configured limit " + maxTasks);
+ }
+
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=703292&r1=703291&r2=703292&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Oct 9 16:38:53 2008
@@ -2721,5 +2721,12 @@
return (JobStatus[]) jobStatusList.toArray(
new JobStatus[jobStatusList.size()]);
}
+
+ /**
+ * Returns the confgiured maximum number of tasks for a single job
+ */
+ int getMaxTasksPerJob() {
+ return conf.getInt("mapred.jobtracker.maxtasks.per.job", -1);
+ }
}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=703292&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Thu Oct 9 16:38:53 2008
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+/**
+ * A JUnit test to test configired task limits.
+ */
+public class TestTaskLimits extends TestCase {
+
+ {
+ ((Log4JLogger)JobInProgress.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static final Log LOG =
+ LogFactory.getLog(TestMiniMRWithDFS.class.getName());
+
+ static final int NUM_MAPS = 5;
+ static final int NUM_SAMPLES = 100;
+
+ public static class TestResult {
+ public String output;
+ public RunningJob job;
+ TestResult(RunningJob job, String output) {
+ this.job = job;
+ this.output = output;
+ }
+ }
+
+ static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+ LOG.info("runPI");
+ double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+ double error = Math.abs(Math.PI - estimate);
+ System.out.println("PI estimation " + error);
+ }
+
+ /**
+ * Run the pi test with a specifix value of
+ * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
+ */
+ private boolean runOneTest(int maxTasks) throws IOException {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ boolean success = false;
+ try {
+ final int taskTrackers = 2;
+
+ Configuration conf = new Configuration();
+ conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
+ dfs = new MiniDFSCluster(conf, 4, true, null);
+ fileSys = dfs.getFileSystem();
+ JobConf jconf = new JobConf(conf);
+ mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1,
+ null, null, null, jconf);
+
+ JobConf jc = mr.createJobConf();
+ try {
+ runPI(mr, jc);
+ success = true;
+ } catch (IOException e) {
+ success = false;
+ }
+ } finally {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown(); }
+ }
+ return success;
+ }
+
+ public void testTaskLimits() throws IOException {
+
+ System.out.println("Job 1 running with max set to 2");
+ boolean status = runOneTest(2);
+ assertTrue(status == false);
+ System.out.println("Job 1 failed as expected.");
+
+ // verify that checking this limit works well. The job
+ // needs 5 mappers and we set the limit to 7.
+ System.out.println("Job 2 running with max set to 7.");
+ status = runOneTest(7);
+ assertTrue(status == true);
+ System.out.println("Job 2 succeeded as expected.");
+
+ System.out.println("Job 3 running with max disabled.");
+ status = runOneTest(-1);
+ assertTrue(status == true);
+ System.out.println("Job 3 succeeded as expected.");
+ }
+}