You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/05/08 07:36:30 UTC
git commit: CRUNCH-202: Add MRPipelineExecution to expose some
MR-specific APIs
Updated Branches:
refs/heads/master 6b3f2894e -> d864f2fd4
CRUNCH-202: Add MRPipelineExecution to expose some MR-specific APIs
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d864f2fd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d864f2fd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d864f2fd
Branch: refs/heads/master
Commit: d864f2fd4634f62b2e5684169becd9f599c6d5e3
Parents: 6b3f289
Author: Chao Shi <ch...@apache.org>
Authored: Wed May 8 13:27:07 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed May 8 13:27:07 2013 +0800
----------------------------------------------------------------------
.../lib/jobcontrol/CrunchControlledJob.java | 36 +++++-------
.../mapreduce/lib/jobcontrol/CrunchJobControl.java | 16 +++++-
.../main/java/org/apache/crunch/impl/mr/MRJob.java | 45 +++++++++++++++
.../java/org/apache/crunch/impl/mr/MRPipeline.java | 4 +-
.../apache/crunch/impl/mr/MRPipelineExecution.java | 28 +++++++++
.../org/apache/crunch/impl/mr/exec/MRExecutor.java | 18 +++++-
.../apache/crunch/impl/mr/plan/DotfileWriter.java | 7 +-
7 files changed, 125 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 93926c1..0038ab7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -22,11 +22,13 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.MRJob;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -34,18 +36,13 @@ import com.google.common.collect.Lists;
* This class encapsulates a MapReduce job and its dependency. It monitors the
* states of the depending jobs and updates the state of this job. A job starts
* in the WAITING state. If it does not have any depending jobs, or all of the
- * depending jobs are in SUCCEEDED state, then the job state will become READY. If
+ * depending jobs are in SUCCESS state, then the job state will become READY. If
* any depending jobs fail, the job will fail too. When in READY state, the job
* can be submitted to Hadoop for execution, with the state changing into
* RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED
* state, depending the status of the job execution.
*/
-public class CrunchControlledJob {
-
- // A job will be in one of the following states
- public static enum State {
- SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
- };
+public class CrunchControlledJob implements MRJob {
public static interface Hook {
public void run() throws IOException;
@@ -139,16 +136,22 @@ public class CrunchControlledJob {
return this.job.getJobID();
}
- /**
- * @return the mapreduce job
- */
+ @Override
public synchronized Job getJob() {
return this.job;
}
- /**
- * @return the state of this job
- */
+ @Override
+ public List<MRJob> getDependentJobs() {
+ return Lists.transform(dependingJobs, new Function<CrunchControlledJob, MRJob>() {
+ @Override
+ public MRJob apply(CrunchControlledJob job) {
+ return job;
+ }
+ });
+ }
+
+ @Override
public synchronized State getJobState() {
return this.state;
}
@@ -181,13 +184,6 @@ public class CrunchControlledJob {
}
/**
- * @return the depending jobs of this job
- */
- public List<CrunchControlledJob> getDependentJobs() {
- return this.dependingJobs;
- }
-
- /**
* Add a job to this jobs' dependency list. Dependent jobs can only be added
* while a Job is waiting to run, not during or afterwards.
*
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 727ab6f..47cfb94 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -23,9 +23,10 @@ import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
+import org.apache.crunch.impl.mr.MRJob.State;
/**
* This class encapsulates a set of MapReduce jobs and its dependency.
@@ -106,6 +107,19 @@ public class CrunchJobControl {
return toList(this.failedJobs);
}
+ /**
+ * @return the jobs in all states
+ */
+ public synchronized List<CrunchControlledJob> getAllJobs() {
+ return ImmutableList.<CrunchControlledJob>builder()
+ .addAll(waitingJobs.values())
+ .addAll(readyJobs.values())
+ .addAll(runningJobs.values())
+ .addAll(successfulJobs.values())
+ .addAll(failedJobs.values())
+ .build();
+ }
+
private static void addToQueue(CrunchControlledJob aJob,
Map<Integer, CrunchControlledJob> queue) {
synchronized (queue) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java
new file mode 100644
index 0000000..188b556
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import java.util.List;
+
+/**
+ * A Hadoop MapReduce job managed by Crunch.
+ */
+public interface MRJob {
+
+ /** A job will be in one of the following states. */
+ public static enum State {
+ SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
+ };
+
+ /** @return the Job ID assigned by Crunch */
+ int getJobID();
+
+ /** @return the internal Hadoop MapReduce job */
+ Job getJob();
+
+ /** @return the depending jobs of this job */
+ List<MRJob> getDependentJobs();
+
+ /** @return the state of this job */
+ State getJobState();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 00cf486..f167846 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -168,8 +168,8 @@ public class MRPipeline implements Pipeline {
}
@Override
- public PipelineExecution runAsync() {
- PipelineExecution res = plan().execute();
+ public MRPipelineExecution runAsync() {
+ MRPipelineExecution res = plan().execute();
outputTargets.clear();
return res;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
new file mode 100644
index 0000000..b9d53fe
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import org.apache.crunch.PipelineExecution;
+
+import java.util.List;
+
+public interface MRPipelineExecution extends PipelineExecution {
+
+ List<MRJob> getJobs();
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 4c7b7ea..9318271 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -27,16 +27,18 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Function;
import com.google.common.collect.Lists;
/**
@@ -48,7 +50,7 @@ import com.google.common.collect.Lists;
*
* It is thread-safe.
*/
-public class MRExecutor implements PipelineExecution {
+public class MRExecutor implements MRPipelineExecution {
private static final Log LOG = LogFactory.getLog(MRExecutor.class);
@@ -88,7 +90,7 @@ public class MRExecutor implements PipelineExecution {
this.planDotFile = planDotFile;
}
- public PipelineExecution execute() {
+ public MRPipelineExecution execute() {
monitorThread.start();
return this;
}
@@ -195,4 +197,14 @@ public class MRExecutor implements PipelineExecution {
conf.get("mapred.job.tracker", "local"));
return "local".equals(jobTrackerAddress);
}
+
+ @Override
+ public List<MRJob> getJobs() {
+ return Lists.transform(control.getAllJobs(), new Function<CrunchControlledJob, MRJob>() {
+ @Override
+ public MRJob apply(CrunchControlledJob job) {
+ return job;
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
index 46d8c53..9541b99 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -201,19 +201,20 @@ public class DotfileWriter {
public String buildDotfile() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("digraph G {\n");
- int clusterIndex = 0;
for (String globalDeclaration : globalNodeDeclarations) {
stringBuilder.append(String.format(" %s\n", globalDeclaration));
}
for (JobPrototype jobPrototype : jobPrototypes){
+ // Must prefix subgraph name with "cluster", otherwise its border won't render. I don't know why.
StringBuilder jobProtoStringBuilder = new StringBuilder();
- jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++));
+ jobProtoStringBuilder.append(String.format(" subgraph \"cluster-job%d\" {\n", jobPrototype.getJobID()));
for (MRTaskType taskType : MRTaskType.values()){
Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
if (jobNodeDeclarations.containsKey(jobTaskKey)){
- jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++));
+ jobProtoStringBuilder.append(String.format(
+ " subgraph \"cluster-job%d-%s\" {\n", jobPrototype.getJobID(), taskType.name().toLowerCase()));
jobProtoStringBuilder.append(String.format(" %s\n", getTaskGraphAttributes(taskType)));
for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){
jobProtoStringBuilder.append(String.format(" %s\n", declarationEntry));