You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/03/25 13:56:28 UTC

git commit: CRUNCH-182: Remove CrunchJob and merge its logic into CrunchControlledJob

Updated Branches:
  refs/heads/master bd72e849f -> b34c2f22f


CRUNCH-182: Remove CrunchJob and merge its logic into CrunchControlledJob


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b34c2f22
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b34c2f22
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b34c2f22

Branch: refs/heads/master
Commit: b34c2f22fb9543de1c4ac8158282be1776b77ce9
Parents: bd72e84
Author: Chao Shi <ch...@apache.org>
Authored: Sat Mar 23 15:18:58 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Sat Mar 23 15:20:48 2013 +0800

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchControlledJob.java        |  133 +++++------
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |   68 ++----
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |  175 ---------------
 .../apache/crunch/impl/mr/exec/CrunchJobHooks.java |  153 +++++++++++++
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |    2 +-
 .../apache/crunch/impl/mr/plan/JobPrototype.java   |   35 ++-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |    9 +-
 .../crunch/impl/mr/run/RuntimeParameters.java      |    2 +
 .../crunch/impl/mr/exec/CrunchJobHooksTest.java    |   42 ++++
 .../apache/crunch/impl/mr/exec/CrunchJobTest.java  |   42 ----
 10 files changed, 310 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 223673e..93926c1 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -18,17 +18,18 @@
 package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+
 /**
  * This class encapsulates a MapReduce job and its dependency. It monitors the
  * states of the depending jobs and updates the state of this job. A job starts
@@ -46,48 +47,50 @@ public class CrunchControlledJob {
     SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
   };
 
-  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
-  protected State state;
-  protected Job job; // mapreduce job to be executed.
-  // some info for human consumption, e.g. the reason why the job failed
-  protected String message;
-  private String controlID; // assigned and used by JobControl class
+  public static interface Hook {
+    public void run() throws IOException;
+  }
+
+  private static final Log LOG = LogFactory.getLog(CrunchControlledJob.class);
+
+  private final int jobID;
+  private final Job job; // mapreduce job to be executed.
   // the jobs the current job depends on
-  private List<CrunchControlledJob> dependingJobs;
+  private final List<CrunchControlledJob> dependingJobs;
+  private final Hook prepareHook;
+  private final Hook completionHook;
+  private State state;
+  // some info for human consumption, e.g. the reason why the job failed
+  private String message;
+  private String lastKnownProgress;
 
   /**
    * Construct a job.
-   * 
+   *
+   * @param jobID
+   *          an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}.
    * @param job
    *          a mapreduce job to be executed.
-   * @param dependingJobs
-   *          an array of jobs the current job depends on
+   * @param prepareHook
+   *          a piece of code that will run before this job is submitted.
+   * @param completionHook
+   *          a piece of code that will run after this job gets completed.
    */
-  public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs)
-      throws IOException {
+  public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) {
+    this.jobID = jobID;
     this.job = job;
-    this.dependingJobs = dependingJobs;
+    this.dependingJobs = Lists.newArrayList();
+    this.prepareHook = prepareHook;
+    this.completionHook = completionHook;
     this.state = State.WAITING;
-    this.controlID = "unassigned";
     this.message = "just initialized";
   }
 
-  /**
-   * Construct a job.
-   * 
-   * @param conf
-   *          mapred job configuration representing a job to be executed.
-   * @throws IOException
-   */
-  public CrunchControlledJob(Configuration conf) throws IOException {
-    this(new Job(conf), null);
-  }
-
   @Override
   public String toString() {
     StringBuffer sb = new StringBuffer();
     sb.append("job name:\t").append(this.job.getJobName()).append("\n");
-    sb.append("job id:\t").append(this.controlID).append("\n");
+    sb.append("job id:\t").append(this.jobID).append("\n");
     sb.append("job state:\t").append(this.state).append("\n");
     sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
     sb.append("job message:\t").append(this.message).append("\n");
@@ -114,7 +117,7 @@ public class CrunchControlledJob {
 
   /**
    * Set the job name for this job.
-   * 
+   *
    * @param jobName
    *          the job name
    */
@@ -123,20 +126,10 @@ public class CrunchControlledJob {
   }
 
   /**
-   * @return the job ID of this job assigned by JobControl
-   */
-  public String getJobID() {
-    return this.controlID;
-  }
-
-  /**
-   * Set the job ID for this job.
-   * 
-   * @param id
-   *          the job ID
+   * @return the job ID of this job
    */
-  public void setJobID(String id) {
-    this.controlID = id;
+  public int getJobID() {
+    return this.jobID;
   }
 
   /**
@@ -154,16 +147,6 @@ public class CrunchControlledJob {
   }
 
   /**
-   * Set the mapreduce job
-   * 
-   * @param job
-   *          the mapreduce job for this job.
-   */
-  public synchronized void setJob(Job job) {
-    this.job = job;
-  }
-
-  /**
    * @return the state of this job
    */
   public synchronized State getJobState() {
@@ -214,9 +197,6 @@ public class CrunchControlledJob {
    */
   public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
     if (this.state == State.WAITING) { // only allowed to add jobs when waiting
-      if (this.dependingJobs == null) {
-        this.dependingJobs = new ArrayList<CrunchControlledJob>();
-      }
       return this.dependingJobs.add(dependingJob);
     } else {
       return false;
@@ -246,7 +226,7 @@ public class CrunchControlledJob {
    * Check the state of this running job. The state may remain the same, become
    * SUCCEEDED or FAILED.
    */
-  protected void checkRunningState() throws IOException, InterruptedException {
+  private void checkRunningState() throws IOException, InterruptedException {
     try {
       if (job.isComplete()) {
         if (job.isSuccessful()) {
@@ -255,6 +235,11 @@ public class CrunchControlledJob {
           this.state = State.FAILED;
           this.message = "Job failed!";
         }
+      } else {
+        // still running
+        if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) {
+          logJobProgress();
+        }
       }
     } catch (IOException ioe) {
       this.state = State.FAILED;
@@ -266,6 +251,9 @@ public class CrunchControlledJob {
       } catch (IOException e) {
       }
     }
+    if (isCompleted()) {
+      completionHook.run();
+    }
   }
 
   /**
@@ -313,26 +301,25 @@ public class CrunchControlledJob {
    */
   protected synchronized void submit() {
     try {
-      Configuration conf = job.getConfiguration();
-      if (conf.getBoolean(CREATE_DIR, false)) {
-        Path[] inputPaths = FileInputFormat.getInputPaths(job);
-        for (Path inputPath : inputPaths) {
-          FileSystem fs = inputPath.getFileSystem(conf);
-          if (!fs.exists(inputPath)) {
-            try {
-              fs.mkdirs(inputPath);
-            } catch (IOException e) {
-
-            }
-          }
-        }
-      }
+      prepareHook.run();
       job.submit();
       this.state = State.RUNNING;
+      LOG.info("Running job \"" + getJobName() + "\"");
+      LOG.info("Job status available at: " + job.getTrackingURL());
     } catch (Exception ioe) {
       this.state = State.FAILED;
       this.message = StringUtils.stringifyException(ioe);
+      LOG.info("Error occurred starting job \"" + getJobName() + "\":");
+      LOG.info(getMessage());
     }
   }
 
+  private void logJobProgress() throws IOException, InterruptedException {
+    String progress = String.format("map %.0f%% reduce %.0f%%",
+        100.0 * job.mapProgress(), 100.0 * job.reduceProgress());
+    if (!Objects.equal(lastKnownProgress, progress)) {
+      LOG.info(job.getJobName() + " progress: " + progress);
+      lastKnownProgress = progress;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 0342ad4..727ab6f 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -19,7 +19,6 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -40,16 +39,15 @@ import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.Sta
  */
 public class CrunchJobControl {
 
-  private Map<String, CrunchControlledJob> waitingJobs;
-  private Map<String, CrunchControlledJob> readyJobs;
-  private Map<String, CrunchControlledJob> runningJobs;
-  private Map<String, CrunchControlledJob> successfulJobs;
-  private Map<String, CrunchControlledJob> failedJobs;
+  private Map<Integer, CrunchControlledJob> waitingJobs;
+  private Map<Integer, CrunchControlledJob> readyJobs;
+  private Map<Integer, CrunchControlledJob> runningJobs;
+  private Map<Integer, CrunchControlledJob> successfulJobs;
+  private Map<Integer, CrunchControlledJob> failedJobs;
 
   private Log log = LogFactory.getLog(CrunchJobControl.class);
 
-  private long nextJobID;
-  private String groupName;
+  private final String groupName;
 
   /**
    * Construct a job control for a group of jobs.
@@ -58,16 +56,15 @@ public class CrunchJobControl {
    *          a name identifying this group
    */
   public CrunchJobControl(String groupName) {
-    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
-    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
-    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
-    this.successfulJobs = new Hashtable<String, CrunchControlledJob>();
-    this.failedJobs = new Hashtable<String, CrunchControlledJob>();
-    this.nextJobID = -1;
+    this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>();
+    this.failedJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.groupName = groupName;
   }
 
-  private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob> jobs) {
+  private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) {
     ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>();
     synchronized (jobs) {
       for (CrunchControlledJob job : jobs.values()) {
@@ -109,25 +106,20 @@ public class CrunchJobControl {
     return toList(this.failedJobs);
   }
 
-  private String getNextJobID() {
-    nextJobID += 1;
-    return this.groupName + this.nextJobID;
-  }
-
   private static void addToQueue(CrunchControlledJob aJob,
-      Map<String, CrunchControlledJob> queue) {
+      Map<Integer, CrunchControlledJob> queue) {
     synchronized (queue) {
       queue.put(aJob.getJobID(), aJob);
     }
   }
 
   private void addToQueue(CrunchControlledJob aJob) {
-    Map<String, CrunchControlledJob> queue = getQueue(aJob.getJobState());
+    Map<Integer, CrunchControlledJob> queue = getQueue(aJob.getJobState());
     addToQueue(aJob, queue);
   }
 
-  private Map<String, CrunchControlledJob> getQueue(State state) {
-    Map<String, CrunchControlledJob> retv = null;
+  private Map<Integer, CrunchControlledJob> getQueue(State state) {
+    Map<Integer, CrunchControlledJob> retv = null;
     if (state == State.WAITING) {
       retv = this.waitingJobs;
     } else if (state == State.READY) {
@@ -148,31 +140,17 @@ public class CrunchJobControl {
    * @param aJob
    *          the new job
    */
-  synchronized public String addJob(CrunchControlledJob aJob) {
-    String id = this.getNextJobID();
-    aJob.setJobID(id);
+  synchronized public void addJob(CrunchControlledJob aJob) {
     aJob.setJobState(State.WAITING);
     this.addToQueue(aJob);
-    return id;
-  }
-
-  /**
-   * Add a collection of jobs
-   * 
-   * @param jobs
-   */
-  public void addJobCollection(Collection<CrunchControlledJob> jobs) {
-    for (CrunchControlledJob job : jobs) {
-      addJob(job);
-    }
   }
 
   synchronized private void checkRunningJobs() throws IOException,
       InterruptedException {
 
-    Map<String, CrunchControlledJob> oldJobs = null;
+    Map<Integer, CrunchControlledJob> oldJobs = null;
     oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
+    this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
 
     for (CrunchControlledJob nextJob : oldJobs.values()) {
       nextJob.checkState();
@@ -182,9 +160,9 @@ public class CrunchJobControl {
 
   synchronized private void checkWaitingJobs() throws IOException,
       InterruptedException {
-    Map<String, CrunchControlledJob> oldJobs = null;
+    Map<Integer, CrunchControlledJob> oldJobs = null;
     oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
+    this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
 
     for (CrunchControlledJob nextJob : oldJobs.values()) {
       nextJob.checkState();
@@ -193,9 +171,9 @@ public class CrunchJobControl {
   }
 
   synchronized private void startReadyJobs() {
-    Map<String, CrunchControlledJob> oldJobs = null;
+    Map<Integer, CrunchControlledJob> oldJobs = null;
     oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
+    this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
 
     for (CrunchControlledJob nextJob : oldJobs.values()) {
       // Submitting Job to Hadoop

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
deleted file mode 100644
index f0e5cd1..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.plan.MSCROutputHandler;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-
-public class CrunchJob extends CrunchControlledJob {
-
-  private final Log log = LogFactory.getLog(CrunchJob.class);
-
-  private final Path workingPath;
-  private final Map<Integer, PathTarget> multiPaths;
-  private final boolean mapOnlyJob;
-  private String lastKnownProgress;
-
-  public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException {
-    super(job, Lists.<CrunchControlledJob> newArrayList());
-    this.workingPath = workingPath;
-    this.multiPaths = handler.getMultiPaths();
-    this.mapOnlyJob = handler.isMapOnlyJob();
-  }
-
-  private synchronized void handleMultiPaths() throws IOException {
-    if (!multiPaths.isEmpty()) {
-      // Need to handle moving the data from the output directory of the
-      // job to the output locations specified in the paths.
-      FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
-      for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
-        final int i = entry.getKey();
-        final Path dst = entry.getValue().getPath();
-        FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
-
-        Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
-        Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
-        Configuration conf = job.getConfiguration();
-        FileSystem dstFs = dst.getFileSystem(conf);
-        if (!dstFs.exists(dst)) {
-          dstFs.mkdirs(dst);
-        }
-        boolean sameFs = isCompatible(srcFs, dst);
-        for (Path s : srcs) {
-          Path d = getDestFile(conf, s, dst, fileNamingScheme);
-          if (sameFs) {
-            srcFs.rename(s, d);
-          } else {
-            FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
-          }
-        }
-      }
-    }
-  }
-
-  private boolean isCompatible(FileSystem fs, Path path) {
-    try {
-      fs.makeQualified(path);
-      return true;
-    } catch (IllegalArgumentException e) {
-      return false;
-    }
-  }
-
-  private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
-      throws IOException {
-    String outputFilename = null;
-    if (mapOnlyJob) {
-      outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
-    } else {
-      outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, CrunchJob.extractPartitionNumber(src.getName()));
-    }
-    if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-      outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
-    }
-    return new Path(dir, outputFilename);
-  }
-
-  @Override
-  protected void checkRunningState() throws IOException, InterruptedException {
-    try {
-      if (job.isComplete()) {
-        if (job.isSuccessful()) {
-          handleMultiPaths();
-          this.state = State.SUCCESS;
-        } else {
-          this.state = State.FAILED;
-          this.message = "Job failed!";
-        }
-      } else { // still running
-        if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) {
-          logJobProgress();
-        }
-      }
-    } catch (IOException ioe) {
-      this.state = State.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (job != null) {
-          job.killJob();
-        }
-      } catch (IOException e) {
-      }
-    }
-  }
-
-  @Override
-  protected synchronized void submit() {
-    super.submit();
-    if (this.state == State.RUNNING) {
-      log.info("Running job \"" + getJobName() + "\"");
-      log.info("Job status available at: " + job.getTrackingURL());
-    } else {
-      log.info("Error occurred starting job \"" + getJobName() + "\":");
-      log.info(getMessage());
-    }
-  }
-
-  private void logJobProgress() throws IOException, InterruptedException {
-    String progress = String.format("map %.0f%% reduce %.0f%%",
-        100.0 * job.mapProgress(), 100.0 * job.reduceProgress());
-    if (!Objects.equal(lastKnownProgress, progress)) {
-      log.info(job.getJobName() + " progress: " + progress);
-      lastKnownProgress = progress;
-    }
-  }
-
-  /**
-   * Extract the partition number from a raw reducer output filename.
-   * 
-   * @param fileName The raw reducer output file name
-   * @return The partition number encoded in the filename
-   */
-  static int extractPartitionNumber(String reduceOutputFileName) {
-    Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
-    if (matcher.find()) {
-      return Integer.parseInt(matcher.group(1), 10);
-    } else {
-      throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
new file mode 100644
index 0000000..74bc9ac
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public final class CrunchJobHooks {
+
+  private CrunchJobHooks() {}
+
+  /** Creates missing input directories before job is submitted. */
+  public static final class PrepareHook implements CrunchControlledJob.Hook {
+    private final Job job;
+
+    public PrepareHook(Job job) {
+      this.job = job;
+    }
+
+    @Override
+    public void run() throws IOException {
+      Configuration conf = job.getConfiguration();
+      if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) {
+        Path[] inputPaths = FileInputFormat.getInputPaths(job);
+        for (Path inputPath : inputPaths) {
+          FileSystem fs = inputPath.getFileSystem(conf);
+          if (!fs.exists(inputPath)) {
+            try {
+              fs.mkdirs(inputPath);
+            } catch (IOException e) {
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /** Moving output files produced by the MapReduce job to specified directories. */
+  public static final class CompletionHook implements CrunchControlledJob.Hook {
+    private final Job job;
+    private final Path workingPath;
+    private final Map<Integer, PathTarget> multiPaths;
+    private final boolean mapOnlyJob;
+
+    public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, boolean mapOnlyJob) {
+      this.job = job;
+      this.workingPath = workingPath;
+      this.multiPaths = multiPaths;
+      this.mapOnlyJob = mapOnlyJob;
+    }
+
+    @Override
+    public void run() throws IOException {
+      handleMultiPaths();
+    }
+
+    private synchronized void handleMultiPaths() throws IOException {
+      if (!multiPaths.isEmpty()) {
+        // Need to handle moving the data from the output directory of the
+        // job to the output locations specified in the paths.
+        FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
+        for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
+          final int i = entry.getKey();
+          final Path dst = entry.getValue().getPath();
+          FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
+
+          Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
+          Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+          Configuration conf = job.getConfiguration();
+          FileSystem dstFs = dst.getFileSystem(conf);
+          if (!dstFs.exists(dst)) {
+            dstFs.mkdirs(dst);
+          }
+          boolean sameFs = isCompatible(srcFs, dst);
+          for (Path s : srcs) {
+            Path d = getDestFile(conf, s, dst, fileNamingScheme);
+            if (sameFs) {
+              srcFs.rename(s, d);
+            } else {
+              FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
+            }
+          }
+        }
+      }
+    }
+
+    private boolean isCompatible(FileSystem fs, Path path) {
+      try {
+        fs.makeQualified(path);
+        return true;
+      } catch (IllegalArgumentException e) {
+        return false;
+      }
+    }
+    private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
+        throws IOException {
+      String outputFilename = null;
+      if (mapOnlyJob) {
+        outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
+      } else {
+        outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
+      }
+      if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+        outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
+      }
+      return new Path(dir, outputFilename);
+    }
+  }
+
+  /**
+   * Extract the partition number from a raw reducer output filename.
+   *
+   * @param reduceOutputFileName The raw reducer output file name
+   * @return The partition number encoded in the filename
+   */
+  static int extractPartitionNumber(String reduceOutputFileName) {
+    Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
+    if (matcher.find()) {
+      return Integer.parseInt(matcher.group(1), 10);
+    } else {
+      throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index a784b66..4c7b7ea 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -80,7 +80,7 @@ public class MRExecutor implements PipelineExecution {
       : new CappedExponentialCounter(500, 10000);
   }
 
-  public void addJob(CrunchJob job) {
+  public void addJob(CrunchControlledJob job) {
     this.control.addJob(job);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 181468f..f22b5a1 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -25,10 +25,11 @@ import java.util.Set;
 
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.impl.mr.collect.DoTableImpl;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.exec.CrunchJob;
+import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
 import org.apache.crunch.impl.mr.run.CrunchCombiner;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
@@ -49,14 +50,16 @@ import com.google.common.collect.Sets;
 
 class JobPrototype {
 
-  public static JobPrototype createMapReduceJob(PGroupedTableImpl<?, ?> group, Set<NodePath> inputs, Path workingPath) {
-    return new JobPrototype(inputs, group, workingPath);
+  public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
+      Set<NodePath> inputs, Path workingPath) {
+    return new JobPrototype(jobID, inputs, group, workingPath);
   }
 
-  public static JobPrototype createMapOnlyJob(HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
-    return new JobPrototype(mapNodePaths, workingPath);
+  public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
+    return new JobPrototype(jobID, mapNodePaths, workingPath);
   }
 
+  private final int jobID; // TODO: maybe stageID sounds better
   private final Set<NodePath> mapNodePaths;
   private final PGroupedTableImpl<?, ?> group;
   private final Set<JobPrototype> dependencies = Sets.newHashSet();
@@ -66,22 +69,28 @@ class JobPrototype {
   private HashMultimap<Target, NodePath> targetsToNodePaths;
   private DoTableImpl<?, ?> combineFnTable;
 
-  private CrunchJob job;
+  private CrunchControlledJob job;
 
-  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
+  private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
+    this.jobID = jobID;
     this.mapNodePaths = ImmutableSet.copyOf(inputs);
     this.group = group;
     this.workingPath = workingPath;
     this.targetsToNodePaths = null;
   }
 
-  private JobPrototype(HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+  private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+    this.jobID = jobID;
     this.group = null;
     this.mapNodePaths = null;
     this.workingPath = workingPath;
     this.targetsToNodePaths = outputPaths;
   }
 
+  public int getJobID() {
+    return jobID;
+  }
+
   public boolean isMapOnly() {
     return this.group == null;
   }
@@ -109,7 +118,7 @@ class JobPrototype {
     this.dependencies.add(dependency);
   }
 
-  public CrunchJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+  public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
     if (job == null) {
       job = build(jarClass, conf, pipeline);
       for (JobPrototype proto : dependencies) {
@@ -119,7 +128,7 @@ class JobPrototype {
     return job;
   }
 
-  private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+  private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
     Job job = new Job(conf);
     conf = job.getConfiguration();
     conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
@@ -190,7 +199,11 @@ class JobPrototype {
     }
     job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
 
-    return new CrunchJob(job, outputPath, outputHandler);
+    return new CrunchControlledJob(
+        jobID,
+        job,
+        new CrunchJobHooks.PrepareHook(job),
+        new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
   }
 
   private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 146bcbf..3e1de38 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -46,7 +46,8 @@ public class MSCRPlanner {
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
-  
+  private int lastJobID = 0;
+
   public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
       Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
     this.pipeline = pipeline;
@@ -267,7 +268,7 @@ public class MSCRPlanner {
         throw new IllegalStateException("No outputs?");
       }
       JobPrototype prototype = JobPrototype.createMapOnlyJob(
-          outputPaths, pipeline.createTempPath()); 
+          ++lastJobID, outputPaths, pipeline.createTempPath());
       for (Vertex v : component) {
         assignment.put(v, prototype);
       }
@@ -280,7 +281,7 @@ public class MSCRPlanner {
           usedEdges.add(e);
         }
         JobPrototype prototype = JobPrototype.createMapReduceJob(
-            (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
+            ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
         assignment.put(g, prototype);
         for (Edge e : g.getIncomingEdges()) {
           assignment.put(e.getHead(), prototype);
@@ -335,7 +336,7 @@ public class MSCRPlanner {
       }
       if (!outputPaths.isEmpty()) {
         JobPrototype prototype = JobPrototype.createMapOnlyJob(
-            outputPaths, pipeline.createTempPath());
+            ++lastJobID, outputPaths, pipeline.createTempPath());
         for (Vertex orphan : orphans) {
           assignment.put(orphan, prototype);
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 1ee19e7..604c49c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -30,6 +30,8 @@ public class RuntimeParameters {
 
   public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
 
+  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
+
   // Not instantiated
   private RuntimeParameters() {
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
new file mode 100644
index 0000000..f03c3e2
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CrunchJobHooksTest {
+
+  @Test
+  public void testExtractPartitionNumber() {
+    assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000"));
+    assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010"));
+    assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999"));
+  }
+
+  @Test
+  public void testExtractPartitionNumber_WithSuffix() {
+    assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExtractPartitionNumber_MapOutputFile() {
+    CrunchJobHooks.extractPartitionNumber("out1-m-00000");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
deleted file mode 100644
index 00ad830..0000000
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class CrunchJobTest {
-
-  @Test
-  public void testExtractPartitionNumber() {
-    assertEquals(0, CrunchJob.extractPartitionNumber("out1-r-00000"));
-    assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010"));
-    assertEquals(99999, CrunchJob.extractPartitionNumber("out3-r-99999"));
-  }
-
-  @Test
-  public void testExtractPartitionNumber_WithSuffix() {
-    assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010.avro"));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testExtractPartitionNumber_MapOutputFile() {
-    CrunchJob.extractPartitionNumber("out1-m-00000");
-  }
-}