You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/08/13 20:34:33 UTC

svn commit: r1372541 - in /hadoop/common/branches/branch-1: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/

Author: tomwhite
Date: Mon Aug 13 18:34:33 2012
New Revision: 1372541

URL: http://svn.apache.org/viewvc?rev=1372541&view=rev
Log:
MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks should be optional) to branch-1.

Added:
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java   (with props)
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Aug 13 18:34:33 2012
@@ -27,6 +27,9 @@ Release 1.2.0 - unreleased
     module external to HDFS to specify how HDFS blocks should be placed.
     (Sumadhur Reddy Bolli via szetszwo)
 
+    MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks
+    should be optional) to branch-1. (tomwhite)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Mon Aug 13 18:34:33 2012
@@ -35,6 +35,14 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.committer.job.setup.cleanup.needed</name>
+  <value>true</value>
+  <description> true, if job needs job-setup and job-cleanup.
+                false, otherwise  
+  </description>
+</property>
+
 <!-- i/o properties -->
 
 <property>

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Aug 13 18:34:33 2012
@@ -131,6 +131,7 @@ public class JobInProgress {
   private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
+  private boolean jobSetupCleanupNeeded = true;
 
   JobPriority priority = JobPriority.NORMAL;
   final JobTracker jobtracker;
@@ -361,6 +362,8 @@ public class JobInProgress {
 
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
+    JobContext jobContext = new JobContext(conf, jobId);
+    this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
     try {
       this.userUGI = UserGroupInformation.getCurrentUser();
     } catch (IOException ie){
@@ -449,6 +452,9 @@ public class JobInProgress {
 
       this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
+      
+      JobContext jobContext = new JobContext(conf, jobId);
+      this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
 
       // Construct the jobACLs
       status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
@@ -757,7 +763,35 @@ public class JobInProgress {
     
     // ... use the same for estimating the total output of all maps
     resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
+
+    initSetupCleanupTasks(jobFile);
+
+    synchronized(jobInitKillStatus){
+      jobInitKillStatus.initDone = true;
+
+      // set this before the throw to make sure cleanup works properly
+      tasksInited = true;
+
+      if(jobInitKillStatus.killed) {
+        throw new KillInterruptedException("Job " + jobId + " killed in init");
+      }
+    }
+    
+    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
+                                 numMapTasks, numReduceTasks);
     
+    // if setup is not needed, mark it complete
+    if (!jobSetupCleanupNeeded) {
+      setupComplete();
+    }
+  }
+     
+  private void initSetupCleanupTasks(String jobFile) {
+    if (!jobSetupCleanupNeeded) {
+      // nothing to initialize
+      return;
+    }
+
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
 
@@ -787,25 +821,23 @@ public class JobInProgress {
                        numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();
     
-    synchronized(jobInitKillStatus){
-      jobInitKillStatus.initDone = true;
-
-      // set this before the throw to make sure cleanup works properly
-      tasksInited = true;
-
-      if(jobInitKillStatus.killed) {
-        throw new KillInterruptedException("Job " + jobId + " killed in init");
-      }
-    }
-    
-    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
-                                 numMapTasks, numReduceTasks);
-    
    // Log the number of map and reduce tasks
    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }
 
+  private void setupComplete() {
+    status.setSetupProgress(1.0f);
+    if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded) {
+      jobComplete();
+      return;
+    }
+    if (this.status.getRunState() == JobStatus.PREP) {
+      this.status.setRunState(JobStatus.RUNNING);
+      JobHistory.JobInfo.logStarted(profile.getJobID());
+    }
+  }
+
   TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
   throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo =
@@ -2622,13 +2654,7 @@ public class JobInProgress {
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
-      // Job can start running now.
-      this.status.setSetupProgress(1.0f);
-      // move the job to running state if the job is in prep state
-      if (this.status.getRunState() == JobStatus.PREP) {
-        changeStateTo(JobStatus.RUNNING);
-        JobHistory.JobInfo.logStarted(profile.getJobID());
-      }
+      setupComplete();
     } else if (tip.isJobCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
@@ -2687,6 +2713,9 @@ public class JobInProgress {
         }
       }
     }
+    if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
+      jobComplete();
+    }
     return true;
   }
   
@@ -2747,7 +2776,8 @@ public class JobInProgress {
     //
     // All tasks are complete, then the job is done!
     //
-    if (this.status.getRunState() == JobStatus.RUNNING ) {
+    if (this.status.getRunState() == JobStatus.RUNNING ||
+        this.status.getRunState() == JobStatus.PREP) {
       changeStateTo(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       if (maps.length == 0) {
@@ -2881,6 +2911,9 @@ public class JobInProgress {
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
+      if (!jobSetupCleanupNeeded) {
+        terminateJob(jobTerminationState);
+      }
     }
   }
 
@@ -3169,7 +3202,9 @@ public class JobInProgress {
   }
 
   boolean isSetupFinished() {
-    if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+    // if there is no setup to be launched, consider setup is finished.  
+    if ((tasksInited && setup.length == 0) || 
+        setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
         || setup[1].isFailed()) {
       return true;
     }
@@ -3283,10 +3318,12 @@ public class JobInProgress {
    */
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
-      if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
+      // cleanup map tip
+      if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
         return cleanup[0]; 
       }
-      if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+      // setup map tip
+      if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) { 
         return setup[0];
       }
       for (int i = 0; i < maps.length; i++) {
@@ -3295,10 +3332,12 @@ public class JobInProgress {
         }
       }
     } else {
-      if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
+      // cleanup reduce tip
+      if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) { 
         return cleanup[1]; 
       }
-      if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+      // setup reduce tip
+      if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) { 
         return setup[1];
       }
       for (int i = 0; i < reduces.length; i++) {

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java Mon Aug 13 18:34:33 2012
@@ -349,6 +349,18 @@ public class Job extends JobContext {  
   }
 
   /**
+   * Specify whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @param needed If <code>true</code>, job-setup and job-cleanup will be
+   *               considered from {@link OutputCommitter} 
+   *               else ignored.
+   */
+  public void setJobSetupCleanupNeeded(boolean needed) {
+    ensureState(JobState.DEFINE);
+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", needed);
+  }
+
+  /**
    * Get the URL where some job progress information will be displayed.
    * 
    * @return the URL where some job progress information will be displayed.

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1372541&r1=1372540&r2=1372541&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Mon Aug 13 18:34:33 2012
@@ -275,4 +275,13 @@ public class JobContext {
   public RawComparator<?> getGroupingComparator() {
     return conf.getOutputValueGroupingComparator();
   }
+  
+  /**
+   * Get whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getJobSetupCleanupNeeded() {
+    return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
+  }
 }

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java?rev=1372541&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java Mon Aug 13 18:34:33 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class TestNoJobSetupCleanup extends HadoopTestCase {
+  private static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data","/tmp"))
+    .toURI().toString().replace(' ', '+');
+  private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input");
+  private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output");
+
+  public TestNoJobSetupCleanup() throws IOException {
+    super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2);
+  }
+
+  private Job submitAndValidateJob(Configuration conf, int numMaps, int numReds) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 
+                numMaps, numReds);
+    job.setJobSetupCleanupNeeded(false);
+    job.setOutputFormatClass(MyOutputFormat.class);
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+    JobID jobid = (org.apache.hadoop.mapred.JobID) job.getJobID();
+    JobClient jc = new JobClient(new JobConf(conf));
+    assertTrue(jc.getSetupTaskReports(jobid).length == 0);
+    assertTrue(jc.getCleanupTaskReports(jobid).length == 0);
+    assertTrue(jc.getMapTaskReports(jobid).length == numMaps);
+    assertTrue(jc.getReduceTaskReports(jobid).length == numReds);
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+    FileStatus[] list = fs.listStatus(outDir, new OutputFilter());
+    int numPartFiles = numReds == 0 ? numMaps : numReds;
+    assertTrue("Number of part-files is " + list.length + " and not "
+        + numPartFiles, list.length == numPartFiles);
+    return job;
+  }
+  
+  public void testNoJobSetupCleanup() throws Exception {
+    try {
+      Configuration conf = createJobConf();
+ 
+      // run a job without job-setup and cleanup
+      submitAndValidateJob(conf, 1, 1);
+
+      // run a map only job.
+      submitAndValidateJob(conf, 1, 0);
+
+      // run empty job without job setup and cleanup 
+      submitAndValidateJob(conf, 0, 0);
+
+      // run empty job without job setup and cleanup, with non-zero reduces 
+      submitAndValidateJob(conf, 0, 1);
+    } finally {
+      tearDown();
+    }
+  }
+  
+  static class MyOutputFormat extends TextOutputFormat {
+    public void checkOutputSpecs(JobContext job) 
+        throws FileAlreadyExistsException, IOException{
+      super.checkOutputSpecs(job);
+      // creating dummy TaskAttemptID
+      TaskAttemptID tid = new TaskAttemptID("jt", 1, false, 0, 0);
+      getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), tid)).
+        setupJob(job);
+    }
+  }
+
+  private static class OutputFilter implements PathFilter {
+    public boolean accept(Path path) {
+      return !(path.getName().startsWith("_"));
+    }
+  }
+}
\ No newline at end of file

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
------------------------------------------------------------------------------
    svn:eol-style = native