You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/03/25 13:56:28 UTC
git commit: CRUNCH-182: Remove CrunchJob and merge its logic into
CrunchControlledJob
Updated Branches:
refs/heads/master bd72e849f -> b34c2f22f
CRUNCH-182: Remove CrunchJob and merge its logic into CrunchControlledJob
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b34c2f22
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b34c2f22
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b34c2f22
Branch: refs/heads/master
Commit: b34c2f22fb9543de1c4ac8158282be1776b77ce9
Parents: bd72e84
Author: Chao Shi <ch...@apache.org>
Authored: Sat Mar 23 15:18:58 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Sat Mar 23 15:20:48 2013 +0800
----------------------------------------------------------------------
.../lib/jobcontrol/CrunchControlledJob.java | 133 +++++------
.../mapreduce/lib/jobcontrol/CrunchJobControl.java | 68 ++----
.../org/apache/crunch/impl/mr/exec/CrunchJob.java | 175 ---------------
.../apache/crunch/impl/mr/exec/CrunchJobHooks.java | 153 +++++++++++++
.../org/apache/crunch/impl/mr/exec/MRExecutor.java | 2 +-
.../apache/crunch/impl/mr/plan/JobPrototype.java | 35 ++-
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 9 +-
.../crunch/impl/mr/run/RuntimeParameters.java | 2 +
.../crunch/impl/mr/exec/CrunchJobHooksTest.java | 42 ++++
.../apache/crunch/impl/mr/exec/CrunchJobTest.java | 42 ----
10 files changed, 310 insertions(+), 351 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 223673e..93926c1 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -18,17 +18,18 @@
package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+
/**
* This class encapsulates a MapReduce job and its dependency. It monitors the
* states of the depending jobs and updates the state of this job. A job starts
@@ -46,48 +47,50 @@ public class CrunchControlledJob {
SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
};
- public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
- protected State state;
- protected Job job; // mapreduce job to be executed.
- // some info for human consumption, e.g. the reason why the job failed
- protected String message;
- private String controlID; // assigned and used by JobControl class
+ public static interface Hook {
+ public void run() throws IOException;
+ }
+
+ private static final Log LOG = LogFactory.getLog(CrunchControlledJob.class);
+
+ private final int jobID;
+ private final Job job; // mapreduce job to be executed.
// the jobs the current job depends on
- private List<CrunchControlledJob> dependingJobs;
+ private final List<CrunchControlledJob> dependingJobs;
+ private final Hook prepareHook;
+ private final Hook completionHook;
+ private State state;
+ // some info for human consumption, e.g. the reason why the job failed
+ private String message;
+ private String lastKnownProgress;
/**
* Construct a job.
- *
+ *
+ * @param jobID
+ * an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}.
* @param job
* a mapreduce job to be executed.
- * @param dependingJobs
- * an array of jobs the current job depends on
+ * @param prepareHook
+ * a piece of code that will run before this job is submitted.
+ * @param completionHook
+ * a piece of code that will run after this job gets completed.
*/
- public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs)
- throws IOException {
+ public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) {
+ this.jobID = jobID;
this.job = job;
- this.dependingJobs = dependingJobs;
+ this.dependingJobs = Lists.newArrayList();
+ this.prepareHook = prepareHook;
+ this.completionHook = completionHook;
this.state = State.WAITING;
- this.controlID = "unassigned";
this.message = "just initialized";
}
- /**
- * Construct a job.
- *
- * @param conf
- * mapred job configuration representing a job to be executed.
- * @throws IOException
- */
- public CrunchControlledJob(Configuration conf) throws IOException {
- this(new Job(conf), null);
- }
-
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("job name:\t").append(this.job.getJobName()).append("\n");
- sb.append("job id:\t").append(this.controlID).append("\n");
+ sb.append("job id:\t").append(this.jobID).append("\n");
sb.append("job state:\t").append(this.state).append("\n");
sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
sb.append("job message:\t").append(this.message).append("\n");
@@ -114,7 +117,7 @@ public class CrunchControlledJob {
/**
* Set the job name for this job.
- *
+ *
* @param jobName
* the job name
*/
@@ -123,20 +126,10 @@ public class CrunchControlledJob {
}
/**
- * @return the job ID of this job assigned by JobControl
- */
- public String getJobID() {
- return this.controlID;
- }
-
- /**
- * Set the job ID for this job.
- *
- * @param id
- * the job ID
+ * @return the job ID of this job
*/
- public void setJobID(String id) {
- this.controlID = id;
+ public int getJobID() {
+ return this.jobID;
}
/**
@@ -154,16 +147,6 @@ public class CrunchControlledJob {
}
/**
- * Set the mapreduce job
- *
- * @param job
- * the mapreduce job for this job.
- */
- public synchronized void setJob(Job job) {
- this.job = job;
- }
-
- /**
* @return the state of this job
*/
public synchronized State getJobState() {
@@ -214,9 +197,6 @@ public class CrunchControlledJob {
*/
public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
if (this.state == State.WAITING) { // only allowed to add jobs when waiting
- if (this.dependingJobs == null) {
- this.dependingJobs = new ArrayList<CrunchControlledJob>();
- }
return this.dependingJobs.add(dependingJob);
} else {
return false;
@@ -246,7 +226,7 @@ public class CrunchControlledJob {
* Check the state of this running job. The state may remain the same, become
* SUCCEEDED or FAILED.
*/
- protected void checkRunningState() throws IOException, InterruptedException {
+ private void checkRunningState() throws IOException, InterruptedException {
try {
if (job.isComplete()) {
if (job.isSuccessful()) {
@@ -255,6 +235,11 @@ public class CrunchControlledJob {
this.state = State.FAILED;
this.message = "Job failed!";
}
+ } else {
+ // still running
+ if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) {
+ logJobProgress();
+ }
}
} catch (IOException ioe) {
this.state = State.FAILED;
@@ -266,6 +251,9 @@ public class CrunchControlledJob {
} catch (IOException e) {
}
}
+ if (isCompleted()) {
+ completionHook.run();
+ }
}
/**
@@ -313,26 +301,25 @@ public class CrunchControlledJob {
*/
protected synchronized void submit() {
try {
- Configuration conf = job.getConfiguration();
- if (conf.getBoolean(CREATE_DIR, false)) {
- Path[] inputPaths = FileInputFormat.getInputPaths(job);
- for (Path inputPath : inputPaths) {
- FileSystem fs = inputPath.getFileSystem(conf);
- if (!fs.exists(inputPath)) {
- try {
- fs.mkdirs(inputPath);
- } catch (IOException e) {
-
- }
- }
- }
- }
+ prepareHook.run();
job.submit();
this.state = State.RUNNING;
+ LOG.info("Running job \"" + getJobName() + "\"");
+ LOG.info("Job status available at: " + job.getTrackingURL());
} catch (Exception ioe) {
this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe);
+ LOG.info("Error occurred starting job \"" + getJobName() + "\":");
+ LOG.info(getMessage());
}
}
+ private void logJobProgress() throws IOException, InterruptedException {
+ String progress = String.format("map %.0f%% reduce %.0f%%",
+ 100.0 * job.mapProgress(), 100.0 * job.reduceProgress());
+ if (!Objects.equal(lastKnownProgress, progress)) {
+ LOG.info(job.getJobName() + " progress: " + progress);
+ lastKnownProgress = progress;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 0342ad4..727ab6f 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -19,7 +19,6 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -40,16 +39,15 @@ import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.Sta
*/
public class CrunchJobControl {
- private Map<String, CrunchControlledJob> waitingJobs;
- private Map<String, CrunchControlledJob> readyJobs;
- private Map<String, CrunchControlledJob> runningJobs;
- private Map<String, CrunchControlledJob> successfulJobs;
- private Map<String, CrunchControlledJob> failedJobs;
+ private Map<Integer, CrunchControlledJob> waitingJobs;
+ private Map<Integer, CrunchControlledJob> readyJobs;
+ private Map<Integer, CrunchControlledJob> runningJobs;
+ private Map<Integer, CrunchControlledJob> successfulJobs;
+ private Map<Integer, CrunchControlledJob> failedJobs;
private Log log = LogFactory.getLog(CrunchJobControl.class);
- private long nextJobID;
- private String groupName;
+ private final String groupName;
/**
* Construct a job control for a group of jobs.
@@ -58,16 +56,15 @@ public class CrunchJobControl {
* a name identifying this group
*/
public CrunchJobControl(String groupName) {
- this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
- this.readyJobs = new Hashtable<String, CrunchControlledJob>();
- this.runningJobs = new Hashtable<String, CrunchControlledJob>();
- this.successfulJobs = new Hashtable<String, CrunchControlledJob>();
- this.failedJobs = new Hashtable<String, CrunchControlledJob>();
- this.nextJobID = -1;
+ this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
+ this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
+ this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
+ this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>();
+ this.failedJobs = new Hashtable<Integer, CrunchControlledJob>();
this.groupName = groupName;
}
- private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob> jobs) {
+ private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) {
ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>();
synchronized (jobs) {
for (CrunchControlledJob job : jobs.values()) {
@@ -109,25 +106,20 @@ public class CrunchJobControl {
return toList(this.failedJobs);
}
- private String getNextJobID() {
- nextJobID += 1;
- return this.groupName + this.nextJobID;
- }
-
private static void addToQueue(CrunchControlledJob aJob,
- Map<String, CrunchControlledJob> queue) {
+ Map<Integer, CrunchControlledJob> queue) {
synchronized (queue) {
queue.put(aJob.getJobID(), aJob);
}
}
private void addToQueue(CrunchControlledJob aJob) {
- Map<String, CrunchControlledJob> queue = getQueue(aJob.getJobState());
+ Map<Integer, CrunchControlledJob> queue = getQueue(aJob.getJobState());
addToQueue(aJob, queue);
}
- private Map<String, CrunchControlledJob> getQueue(State state) {
- Map<String, CrunchControlledJob> retv = null;
+ private Map<Integer, CrunchControlledJob> getQueue(State state) {
+ Map<Integer, CrunchControlledJob> retv = null;
if (state == State.WAITING) {
retv = this.waitingJobs;
} else if (state == State.READY) {
@@ -148,31 +140,17 @@ public class CrunchJobControl {
* @param aJob
* the new job
*/
- synchronized public String addJob(CrunchControlledJob aJob) {
- String id = this.getNextJobID();
- aJob.setJobID(id);
+ synchronized public void addJob(CrunchControlledJob aJob) {
aJob.setJobState(State.WAITING);
this.addToQueue(aJob);
- return id;
- }
-
- /**
- * Add a collection of jobs
- *
- * @param jobs
- */
- public void addJobCollection(Collection<CrunchControlledJob> jobs) {
- for (CrunchControlledJob job : jobs) {
- addJob(job);
- }
}
synchronized private void checkRunningJobs() throws IOException,
InterruptedException {
- Map<String, CrunchControlledJob> oldJobs = null;
+ Map<Integer, CrunchControlledJob> oldJobs = null;
oldJobs = this.runningJobs;
- this.runningJobs = new Hashtable<String, CrunchControlledJob>();
+ this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
for (CrunchControlledJob nextJob : oldJobs.values()) {
nextJob.checkState();
@@ -182,9 +160,9 @@ public class CrunchJobControl {
synchronized private void checkWaitingJobs() throws IOException,
InterruptedException {
- Map<String, CrunchControlledJob> oldJobs = null;
+ Map<Integer, CrunchControlledJob> oldJobs = null;
oldJobs = this.waitingJobs;
- this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
+ this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
for (CrunchControlledJob nextJob : oldJobs.values()) {
nextJob.checkState();
@@ -193,9 +171,9 @@ public class CrunchJobControl {
}
synchronized private void startReadyJobs() {
- Map<String, CrunchControlledJob> oldJobs = null;
+ Map<Integer, CrunchControlledJob> oldJobs = null;
oldJobs = this.readyJobs;
- this.readyJobs = new Hashtable<String, CrunchControlledJob>();
+ this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
for (CrunchControlledJob nextJob : oldJobs.values()) {
// Submitting Job to Hadoop
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
deleted file mode 100644
index f0e5cd1..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.plan.MSCROutputHandler;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-
-public class CrunchJob extends CrunchControlledJob {
-
- private final Log log = LogFactory.getLog(CrunchJob.class);
-
- private final Path workingPath;
- private final Map<Integer, PathTarget> multiPaths;
- private final boolean mapOnlyJob;
- private String lastKnownProgress;
-
- public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException {
- super(job, Lists.<CrunchControlledJob> newArrayList());
- this.workingPath = workingPath;
- this.multiPaths = handler.getMultiPaths();
- this.mapOnlyJob = handler.isMapOnlyJob();
- }
-
- private synchronized void handleMultiPaths() throws IOException {
- if (!multiPaths.isEmpty()) {
- // Need to handle moving the data from the output directory of the
- // job to the output locations specified in the paths.
- FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
- for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
- final int i = entry.getKey();
- final Path dst = entry.getValue().getPath();
- FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
-
- Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
- Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
- Configuration conf = job.getConfiguration();
- FileSystem dstFs = dst.getFileSystem(conf);
- if (!dstFs.exists(dst)) {
- dstFs.mkdirs(dst);
- }
- boolean sameFs = isCompatible(srcFs, dst);
- for (Path s : srcs) {
- Path d = getDestFile(conf, s, dst, fileNamingScheme);
- if (sameFs) {
- srcFs.rename(s, d);
- } else {
- FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
- }
- }
- }
- }
- }
-
- private boolean isCompatible(FileSystem fs, Path path) {
- try {
- fs.makeQualified(path);
- return true;
- } catch (IllegalArgumentException e) {
- return false;
- }
- }
-
- private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
- throws IOException {
- String outputFilename = null;
- if (mapOnlyJob) {
- outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
- } else {
- outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, CrunchJob.extractPartitionNumber(src.getName()));
- }
- if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
- outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
- }
- return new Path(dir, outputFilename);
- }
-
- @Override
- protected void checkRunningState() throws IOException, InterruptedException {
- try {
- if (job.isComplete()) {
- if (job.isSuccessful()) {
- handleMultiPaths();
- this.state = State.SUCCESS;
- } else {
- this.state = State.FAILED;
- this.message = "Job failed!";
- }
- } else { // still running
- if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) {
- logJobProgress();
- }
- }
- } catch (IOException ioe) {
- this.state = State.FAILED;
- this.message = StringUtils.stringifyException(ioe);
- try {
- if (job != null) {
- job.killJob();
- }
- } catch (IOException e) {
- }
- }
- }
-
- @Override
- protected synchronized void submit() {
- super.submit();
- if (this.state == State.RUNNING) {
- log.info("Running job \"" + getJobName() + "\"");
- log.info("Job status available at: " + job.getTrackingURL());
- } else {
- log.info("Error occurred starting job \"" + getJobName() + "\":");
- log.info(getMessage());
- }
- }
-
- private void logJobProgress() throws IOException, InterruptedException {
- String progress = String.format("map %.0f%% reduce %.0f%%",
- 100.0 * job.mapProgress(), 100.0 * job.reduceProgress());
- if (!Objects.equal(lastKnownProgress, progress)) {
- log.info(job.getJobName() + " progress: " + progress);
- lastKnownProgress = progress;
- }
- }
-
- /**
- * Extract the partition number from a raw reducer output filename.
- *
- * @param fileName The raw reducer output file name
- * @return The partition number encoded in the filename
- */
- static int extractPartitionNumber(String reduceOutputFileName) {
- Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
- if (matcher.find()) {
- return Integer.parseInt(matcher.group(1), 10);
- } else {
- throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
new file mode 100644
index 0000000..74bc9ac
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public final class CrunchJobHooks {
+
+ private CrunchJobHooks() {}
+
+ /** Creates missing input directories before job is submitted. */
+ public static final class PrepareHook implements CrunchControlledJob.Hook {
+ private final Job job;
+
+ public PrepareHook(Job job) {
+ this.job = job;
+ }
+
+ @Override
+ public void run() throws IOException {
+ Configuration conf = job.getConfiguration();
+ if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) {
+ Path[] inputPaths = FileInputFormat.getInputPaths(job);
+ for (Path inputPath : inputPaths) {
+ FileSystem fs = inputPath.getFileSystem(conf);
+ if (!fs.exists(inputPath)) {
+ try {
+ fs.mkdirs(inputPath);
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /** Moving output files produced by the MapReduce job to specified directories. */
+ public static final class CompletionHook implements CrunchControlledJob.Hook {
+ private final Job job;
+ private final Path workingPath;
+ private final Map<Integer, PathTarget> multiPaths;
+ private final boolean mapOnlyJob;
+
+ public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, boolean mapOnlyJob) {
+ this.job = job;
+ this.workingPath = workingPath;
+ this.multiPaths = multiPaths;
+ this.mapOnlyJob = mapOnlyJob;
+ }
+
+ @Override
+ public void run() throws IOException {
+ handleMultiPaths();
+ }
+
+ private synchronized void handleMultiPaths() throws IOException {
+ if (!multiPaths.isEmpty()) {
+ // Need to handle moving the data from the output directory of the
+ // job to the output locations specified in the paths.
+ FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
+ for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
+ final int i = entry.getKey();
+ final Path dst = entry.getValue().getPath();
+ FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
+
+ Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
+ Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+ Configuration conf = job.getConfiguration();
+ FileSystem dstFs = dst.getFileSystem(conf);
+ if (!dstFs.exists(dst)) {
+ dstFs.mkdirs(dst);
+ }
+ boolean sameFs = isCompatible(srcFs, dst);
+ for (Path s : srcs) {
+ Path d = getDestFile(conf, s, dst, fileNamingScheme);
+ if (sameFs) {
+ srcFs.rename(s, d);
+ } else {
+ FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isCompatible(FileSystem fs, Path path) {
+ try {
+ fs.makeQualified(path);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
+ throws IOException {
+ String outputFilename = null;
+ if (mapOnlyJob) {
+ outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
+ } else {
+ outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
+ }
+ if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+ outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
+ }
+ return new Path(dir, outputFilename);
+ }
+ }
+
+ /**
+ * Extract the partition number from a raw reducer output filename.
+ *
+ * @param reduceOutputFileName The raw reducer output file name
+ * @return The partition number encoded in the filename
+ */
+ static int extractPartitionNumber(String reduceOutputFileName) {
+ Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
+ if (matcher.find()) {
+ return Integer.parseInt(matcher.group(1), 10);
+ } else {
+ throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index a784b66..4c7b7ea 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -80,7 +80,7 @@ public class MRExecutor implements PipelineExecution {
: new CappedExponentialCounter(500, 10000);
}
- public void addJob(CrunchJob job) {
+ public void addJob(CrunchControlledJob job) {
this.control.addJob(job);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 181468f..f22b5a1 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -25,10 +25,11 @@ import java.util.Set;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
import org.apache.crunch.impl.mr.collect.DoTableImpl;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.exec.CrunchJob;
+import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
import org.apache.crunch.impl.mr.run.CrunchCombiner;
import org.apache.crunch.impl.mr.run.CrunchInputFormat;
import org.apache.crunch.impl.mr.run.CrunchMapper;
@@ -49,14 +50,16 @@ import com.google.common.collect.Sets;
class JobPrototype {
- public static JobPrototype createMapReduceJob(PGroupedTableImpl<?, ?> group, Set<NodePath> inputs, Path workingPath) {
- return new JobPrototype(inputs, group, workingPath);
+ public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
+ Set<NodePath> inputs, Path workingPath) {
+ return new JobPrototype(jobID, inputs, group, workingPath);
}
- public static JobPrototype createMapOnlyJob(HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
- return new JobPrototype(mapNodePaths, workingPath);
+ public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
+ return new JobPrototype(jobID, mapNodePaths, workingPath);
}
+ private final int jobID; // TODO: maybe stageID sounds better
private final Set<NodePath> mapNodePaths;
private final PGroupedTableImpl<?, ?> group;
private final Set<JobPrototype> dependencies = Sets.newHashSet();
@@ -66,22 +69,28 @@ class JobPrototype {
private HashMultimap<Target, NodePath> targetsToNodePaths;
private DoTableImpl<?, ?> combineFnTable;
- private CrunchJob job;
+ private CrunchControlledJob job;
- private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
+ private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
+ this.jobID = jobID;
this.mapNodePaths = ImmutableSet.copyOf(inputs);
this.group = group;
this.workingPath = workingPath;
this.targetsToNodePaths = null;
}
- private JobPrototype(HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+ private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+ this.jobID = jobID;
this.group = null;
this.mapNodePaths = null;
this.workingPath = workingPath;
this.targetsToNodePaths = outputPaths;
}
+ public int getJobID() {
+ return jobID;
+ }
+
public boolean isMapOnly() {
return this.group == null;
}
@@ -109,7 +118,7 @@ class JobPrototype {
this.dependencies.add(dependency);
}
- public CrunchJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+ public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
if (job == null) {
job = build(jarClass, conf, pipeline);
for (JobPrototype proto : dependencies) {
@@ -119,7 +128,7 @@ class JobPrototype {
return job;
}
- private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+ private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
Job job = new Job(conf);
conf = job.getConfiguration();
conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
@@ -190,7 +199,11 @@ class JobPrototype {
}
job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
- return new CrunchJob(job, outputPath, outputHandler);
+ return new CrunchControlledJob(
+ jobID,
+ job,
+ new CrunchJobHooks.PrepareHook(job),
+ new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
}
private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 146bcbf..3e1de38 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -46,7 +46,8 @@ public class MSCRPlanner {
private final MRPipeline pipeline;
private final Map<PCollectionImpl<?>, Set<Target>> outputs;
private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
-
+ private int lastJobID = 0;
+
public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
this.pipeline = pipeline;
@@ -267,7 +268,7 @@ public class MSCRPlanner {
throw new IllegalStateException("No outputs?");
}
JobPrototype prototype = JobPrototype.createMapOnlyJob(
- outputPaths, pipeline.createTempPath());
+ ++lastJobID, outputPaths, pipeline.createTempPath());
for (Vertex v : component) {
assignment.put(v, prototype);
}
@@ -280,7 +281,7 @@ public class MSCRPlanner {
usedEdges.add(e);
}
JobPrototype prototype = JobPrototype.createMapReduceJob(
- (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
+ ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
assignment.put(g, prototype);
for (Edge e : g.getIncomingEdges()) {
assignment.put(e.getHead(), prototype);
@@ -335,7 +336,7 @@ public class MSCRPlanner {
}
if (!outputPaths.isEmpty()) {
JobPrototype prototype = JobPrototype.createMapOnlyJob(
- outputPaths, pipeline.createTempPath());
+ ++lastJobID, outputPaths, pipeline.createTempPath());
for (Vertex orphan : orphans) {
assignment.put(orphan, prototype);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 1ee19e7..604c49c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -30,6 +30,8 @@ public class RuntimeParameters {
public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
+ public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
+
// Not instantiated
private RuntimeParameters() {
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
new file mode 100644
index 0000000..f03c3e2
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CrunchJobHooksTest {
+
+ @Test
+ public void testExtractPartitionNumber() {
+ assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000"));
+ assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010"));
+ assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999"));
+ }
+
+ @Test
+ public void testExtractPartitionNumber_WithSuffix() {
+ assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExtractPartitionNumber_MapOutputFile() {
+ CrunchJobHooks.extractPartitionNumber("out1-m-00000");
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
deleted file mode 100644
index 00ad830..0000000
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class CrunchJobTest {
-
- @Test
- public void testExtractPartitionNumber() {
- assertEquals(0, CrunchJob.extractPartitionNumber("out1-r-00000"));
- assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010"));
- assertEquals(99999, CrunchJob.extractPartitionNumber("out3-r-99999"));
- }
-
- @Test
- public void testExtractPartitionNumber_WithSuffix() {
- assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010.avro"));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testExtractPartitionNumber_MapOutputFile() {
- CrunchJob.extractPartitionNumber("out1-m-00000");
- }
-}