You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/08/13 20:34:33 UTC
svn commit: r1372541 - in /hadoop/common/branches/branch-1: ./ src/mapred/
src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/
src/test/org/apache/hadoop/mapreduce/
Author: tomwhite
Date: Mon Aug 13 18:34:33 2012
New Revision: 1372541
URL: http://svn.apache.org/viewvc?rev=1372541&view=rev
Log:
MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks should be optional) to branch-1.
Added:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java (with props)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Aug 13 18:34:33 2012
@@ -27,6 +27,9 @@ Release 1.2.0 - unreleased
module external to HDFS to specify how HDFS blocks should be placed.
(Sumadhur Reddy Bolli via szetszwo)
+ MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks
+ should be optional) to branch-1. (tomwhite)
+
IMPROVEMENTS
HDFS-3515. Port HDFS-1457 to branch-1. (eli)
Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Mon Aug 13 18:34:33 2012
@@ -35,6 +35,14 @@
</description>
</property>
+<property>
+ <name>mapred.committer.job.setup.cleanup.needed</name>
+ <value>true</value>
+ <description> true, if job needs job-setup and job-cleanup.
+ false, otherwise
+ </description>
+</property>
+
<!-- i/o properties -->
<property>
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Aug 13 18:34:33 2012
@@ -131,6 +131,7 @@ public class JobInProgress {
private volatile boolean launchedSetup = false;
private volatile boolean jobKilled = false;
private volatile boolean jobFailed = false;
+ private boolean jobSetupCleanupNeeded = true;
JobPriority priority = JobPriority.NORMAL;
final JobTracker jobtracker;
@@ -361,6 +362,8 @@ public class JobInProgress {
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
+ JobContext jobContext = new JobContext(conf, jobId);
+ this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
try {
this.userUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie){
@@ -449,6 +452,9 @@ public class JobInProgress {
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
+
+ JobContext jobContext = new JobContext(conf, jobId);
+ this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
// Construct the jobACLs
status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
@@ -757,7 +763,35 @@ public class JobInProgress {
// ... use the same for estimating the total output of all maps
resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
+
+ initSetupCleanupTasks(jobFile);
+
+ synchronized(jobInitKillStatus){
+ jobInitKillStatus.initDone = true;
+
+ // set this before the throw to make sure cleanup works properly
+ tasksInited = true;
+
+ if(jobInitKillStatus.killed) {
+ throw new KillInterruptedException("Job " + jobId + " killed in init");
+ }
+ }
+
+ JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
+ numMapTasks, numReduceTasks);
+ // if setup is not needed, mark it complete
+ if (!jobSetupCleanupNeeded) {
+ setupComplete();
+ }
+ }
+
+ private void initSetupCleanupTasks(String jobFile) {
+ if (!jobSetupCleanupNeeded) {
+ // nothing to initialize
+ return;
+ }
+
// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
@@ -787,25 +821,23 @@ public class JobInProgress {
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
- synchronized(jobInitKillStatus){
- jobInitKillStatus.initDone = true;
-
- // set this before the throw to make sure cleanup works properly
- tasksInited = true;
-
- if(jobInitKillStatus.killed) {
- throw new KillInterruptedException("Job " + jobId + " killed in init");
- }
- }
-
- JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
- numMapTasks, numReduceTasks);
-
// Log the number of map and reduce tasks
LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
+ " map tasks and " + numReduceTasks + " reduce tasks.");
}
+ private void setupComplete() {
+ status.setSetupProgress(1.0f);
+ if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded) {
+ jobComplete();
+ return;
+ }
+ if (this.status.getRunState() == JobStatus.PREP) {
+ this.status.setRunState(JobStatus.RUNNING);
+ JobHistory.JobInfo.logStarted(profile.getJobID());
+ }
+ }
+
TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
throws IOException {
TaskSplitMetaInfo[] allTaskSplitMetaInfo =
@@ -2622,13 +2654,7 @@ public class JobInProgress {
if (tip.isJobSetupTask()) {
// setup task has finished. kill the extra setup tip
killSetupTip(!tip.isMapTask());
- // Job can start running now.
- this.status.setSetupProgress(1.0f);
- // move the job to running state if the job is in prep state
- if (this.status.getRunState() == JobStatus.PREP) {
- changeStateTo(JobStatus.RUNNING);
- JobHistory.JobInfo.logStarted(profile.getJobID());
- }
+ setupComplete();
} else if (tip.isJobCleanupTask()) {
// cleanup task has finished. Kill the extra cleanup tip
if (tip.isMapTask()) {
@@ -2687,6 +2713,9 @@ public class JobInProgress {
}
}
}
+ if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
+ jobComplete();
+ }
return true;
}
@@ -2747,7 +2776,8 @@ public class JobInProgress {
//
// All tasks are complete, then the job is done!
//
- if (this.status.getRunState() == JobStatus.RUNNING ) {
+ if (this.status.getRunState() == JobStatus.RUNNING ||
+ this.status.getRunState() == JobStatus.PREP) {
changeStateTo(JobStatus.SUCCEEDED);
this.status.setCleanupProgress(1.0f);
if (maps.length == 0) {
@@ -2881,6 +2911,9 @@ public class JobInProgress {
for (int i = 0; i < reduces.length; i++) {
reduces[i].kill();
}
+ if (!jobSetupCleanupNeeded) {
+ terminateJob(jobTerminationState);
+ }
}
}
@@ -3169,7 +3202,9 @@ public class JobInProgress {
}
boolean isSetupFinished() {
- if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+ // if there is no setup to be launched, consider setup is finished.
+ if ((tasksInited && setup.length == 0) ||
+ setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
|| setup[1].isFailed()) {
return true;
}
@@ -3283,10 +3318,12 @@ public class JobInProgress {
*/
public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
if (tipid.isMap()) {
- if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
+ // cleanup map tip
+ if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
return cleanup[0];
}
- if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+ // setup map tip
+ if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) {
return setup[0];
}
for (int i = 0; i < maps.length; i++) {
@@ -3295,10 +3332,12 @@ public class JobInProgress {
}
}
} else {
- if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
+ // cleanup reduce tip
+ if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) {
return cleanup[1];
}
- if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+ // setup reduce tip
+ if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) {
return setup[1];
}
for (int i = 0; i < reduces.length; i++) {
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java Mon Aug 13 18:34:33 2012
@@ -349,6 +349,18 @@ public class Job extends JobContext {
}
/**
+ * Specify whether job-setup and job-cleanup is needed for the job
+ *
+ * @param needed If <code>true</code>, job-setup and job-cleanup will be
+ * considered from {@link OutputCommitter}
+ * else ignored.
+ */
+ public void setJobSetupCleanupNeeded(boolean needed) {
+ ensureState(JobState.DEFINE);
+ conf.setBoolean("mapred.committer.job.setup.cleanup.needed", needed);
+ }
+
+ /**
* Get the URL where some job progress information will be displayed.
*
* @return the URL where some job progress information will be displayed.
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Mon Aug 13 18:34:33 2012
@@ -275,4 +275,13 @@ public class JobContext {
public RawComparator<?> getGroupingComparator() {
return conf.getOutputValueGroupingComparator();
}
+
+ /**
+ * Get whether job-setup and job-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getJobSetupCleanupNeeded() {
+ return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
+ }
}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java?rev=1372541&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java Mon Aug 13 18:34:33 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class TestNoJobSetupCleanup extends HadoopTestCase {
+ private static String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data","/tmp"))
+ .toURI().toString().replace(' ', '+');
+ private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input");
+ private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output");
+
+ public TestNoJobSetupCleanup() throws IOException {
+ super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2);
+ }
+
+ private Job submitAndValidateJob(Configuration conf, int numMaps, int numReds)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = MapReduceTestUtil.createJob(conf, inDir, outDir,
+ numMaps, numReds);
+ job.setJobSetupCleanupNeeded(false);
+ job.setOutputFormatClass(MyOutputFormat.class);
+ job.waitForCompletion(true);
+ assertTrue(job.isSuccessful());
+ JobID jobid = (org.apache.hadoop.mapred.JobID) job.getJobID();
+ JobClient jc = new JobClient(new JobConf(conf));
+ assertTrue(jc.getSetupTaskReports(jobid).length == 0);
+ assertTrue(jc.getCleanupTaskReports(jobid).length == 0);
+ assertTrue(jc.getMapTaskReports(jobid).length == numMaps);
+ assertTrue(jc.getReduceTaskReports(jobid).length == numReds);
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+ FileStatus[] list = fs.listStatus(outDir, new OutputFilter());
+ int numPartFiles = numReds == 0 ? numMaps : numReds;
+ assertTrue("Number of part-files is " + list.length + " and not "
+ + numPartFiles, list.length == numPartFiles);
+ return job;
+ }
+
+ public void testNoJobSetupCleanup() throws Exception {
+ try {
+ Configuration conf = createJobConf();
+
+ // run a job without job-setup and cleanup
+ submitAndValidateJob(conf, 1, 1);
+
+ // run a map only job.
+ submitAndValidateJob(conf, 1, 0);
+
+ // run empty job without job setup and cleanup
+ submitAndValidateJob(conf, 0, 0);
+
+ // run empty job without job setup and cleanup, with non-zero reduces
+ submitAndValidateJob(conf, 0, 1);
+ } finally {
+ tearDown();
+ }
+ }
+
+ static class MyOutputFormat extends TextOutputFormat {
+ public void checkOutputSpecs(JobContext job)
+ throws FileAlreadyExistsException, IOException{
+ super.checkOutputSpecs(job);
+ // creating dummy TaskAttemptID
+ TaskAttemptID tid = new TaskAttemptID("jt", 1, false, 0, 0);
+ getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), tid)).
+ setupJob(job);
+ }
+ }
+
+ private static class OutputFilter implements PathFilter {
+ public boolean accept(Path path) {
+ return !(path.getName().startsWith("_"));
+ }
+ }
+}
\ No newline at end of file
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
------------------------------------------------------------------------------
svn:eol-style = native