You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/05/08 07:36:30 UTC

git commit: CRUNCH-202: Add MRPipelineExecution to expose some MR-specific APIs

Updated Branches:
  refs/heads/master 6b3f2894e -> d864f2fd4


CRUNCH-202: Add MRPipelineExecution to expose some MR-specific APIs


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d864f2fd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d864f2fd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d864f2fd

Branch: refs/heads/master
Commit: d864f2fd4634f62b2e5684169becd9f599c6d5e3
Parents: 6b3f289
Author: Chao Shi <ch...@apache.org>
Authored: Wed May 8 13:27:07 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed May 8 13:27:07 2013 +0800

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchControlledJob.java        |   36 +++++-------
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |   16 +++++-
 .../main/java/org/apache/crunch/impl/mr/MRJob.java |   45 +++++++++++++++
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |    4 +-
 .../apache/crunch/impl/mr/MRPipelineExecution.java |   28 +++++++++
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |   18 +++++-
 .../apache/crunch/impl/mr/plan/DotfileWriter.java  |    7 +-
 7 files changed, 125 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 93926c1..0038ab7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -22,11 +22,13 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
@@ -34,18 +36,13 @@ import com.google.common.collect.Lists;
  * This class encapsulates a MapReduce job and its dependency. It monitors the
  * states of the depending jobs and updates the state of this job. A job starts
  * in the WAITING state. If it does not have any depending jobs, or all of the
- * depending jobs are in SUCCEEDED state, then the job state will become READY. If
+ * depending jobs are in SUCCESS state, then the job state will become READY. If
  * any depending jobs fail, the job will fail too. When in READY state, the job
  * can be submitted to Hadoop for execution, with the state changing into
  * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED
  * state, depending the status of the job execution.
  */
-public class CrunchControlledJob {
-
-  // A job will be in one of the following states
-  public static enum State {
-    SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
-  };
+public class CrunchControlledJob implements MRJob {
 
   public static interface Hook {
     public void run() throws IOException;
@@ -139,16 +136,22 @@ public class CrunchControlledJob {
     return this.job.getJobID();
   }
 
-  /**
-   * @return the mapreduce job
-   */
+  @Override
   public synchronized Job getJob() {
     return this.job;
   }
 
-  /**
-   * @return the state of this job
-   */
+  @Override
+  public List<MRJob> getDependentJobs() {
+    return Lists.transform(dependingJobs, new Function<CrunchControlledJob, MRJob>() {
+      @Override
+      public MRJob apply(CrunchControlledJob job) {
+        return job;
+      }
+    });
+  }
+
+  @Override
   public synchronized State getJobState() {
     return this.state;
   }
@@ -181,13 +184,6 @@ public class CrunchControlledJob {
   }
 
   /**
-   * @return the depending jobs of this job
-   */
-  public List<CrunchControlledJob> getDependentJobs() {
-    return this.dependingJobs;
-  }
-
-  /**
    * Add a job to this jobs' dependency list. Dependent jobs can only be added
    * while a Job is waiting to run, not during or afterwards.
    * 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 727ab6f..47cfb94 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -23,9 +23,10 @@ import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
+import org.apache.crunch.impl.mr.MRJob.State;
 
 /**
  * This class encapsulates a set of MapReduce jobs and its dependency.
@@ -106,6 +107,19 @@ public class CrunchJobControl {
     return toList(this.failedJobs);
   }
 
+  /**
+   * @return the jobs in all states
+   */
+  public synchronized List<CrunchControlledJob> getAllJobs() {
+    return ImmutableList.<CrunchControlledJob>builder()
+        .addAll(waitingJobs.values())
+        .addAll(readyJobs.values())
+        .addAll(runningJobs.values())
+        .addAll(successfulJobs.values())
+        .addAll(failedJobs.values())
+        .build();
+  }
+
   private static void addToQueue(CrunchControlledJob aJob,
       Map<Integer, CrunchControlledJob> queue) {
     synchronized (queue) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java
new file mode 100644
index 0000000..188b556
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import java.util.List;
+
+/**
+ * A Hadoop MapReduce job managed by Crunch.
+ */
+public interface MRJob {
+
+  /** A job will be in one of the following states. */
+  public static enum State {
+    SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
+  };
+
+  /** @return the Job ID assigned by Crunch */
+  int getJobID();
+
+  /** @return the internal Hadoop MapReduce job */
+  Job getJob();
+
+  /** @return the depending jobs of this job */
+  List<MRJob> getDependentJobs();
+
+  /** @return the state of this job */
+  State getJobState();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 00cf486..f167846 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -168,8 +168,8 @@ public class MRPipeline implements Pipeline {
   }
   
   @Override
-  public PipelineExecution runAsync() {
-    PipelineExecution res = plan().execute();
+  public MRPipelineExecution runAsync() {
+    MRPipelineExecution res = plan().execute();
     outputTargets.clear();
     return res;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
new file mode 100644
index 0000000..b9d53fe
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import org.apache.crunch.PipelineExecution;
+
+import java.util.List;
+
+public interface MRPipelineExecution extends PipelineExecution {
+
+    List<MRJob> getJobs();
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 4c7b7ea..9318271 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -27,16 +27,18 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
 /**
@@ -48,7 +50,7 @@ import com.google.common.collect.Lists;
  *
  * It is thread-safe.
  */
-public class MRExecutor implements PipelineExecution {
+public class MRExecutor implements MRPipelineExecution {
 
   private static final Log LOG = LogFactory.getLog(MRExecutor.class);
 
@@ -88,7 +90,7 @@ public class MRExecutor implements PipelineExecution {
     this.planDotFile = planDotFile;
   }
   
-  public PipelineExecution execute() {
+  public MRPipelineExecution execute() {
     monitorThread.start();
     return this;
   }
@@ -195,4 +197,14 @@ public class MRExecutor implements PipelineExecution {
         conf.get("mapred.job.tracker", "local"));
     return "local".equals(jobTrackerAddress);
   }
+
+  @Override
+  public List<MRJob> getJobs() {
+    return Lists.transform(control.getAllJobs(), new Function<CrunchControlledJob, MRJob>() {
+      @Override
+      public MRJob apply(CrunchControlledJob job) {
+        return job;
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
index 46d8c53..9541b99 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -201,19 +201,20 @@ public class DotfileWriter {
   public String buildDotfile() {
     StringBuilder stringBuilder = new StringBuilder();
     stringBuilder.append("digraph G {\n");
-    int clusterIndex = 0;
 
     for (String globalDeclaration : globalNodeDeclarations) {
       stringBuilder.append(String.format("  %s\n", globalDeclaration));
     }
 
     for (JobPrototype jobPrototype : jobPrototypes){
+      // Must prefix subgraph name with "cluster", otherwise its border won't render. I don't know why.
       StringBuilder jobProtoStringBuilder = new StringBuilder();
-      jobProtoStringBuilder.append(String.format("  subgraph cluster%d {\n", clusterIndex++));
+      jobProtoStringBuilder.append(String.format("  subgraph \"cluster-job%d\" {\n", jobPrototype.getJobID()));
       for (MRTaskType taskType : MRTaskType.values()){
         Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
         if (jobNodeDeclarations.containsKey(jobTaskKey)){
-          jobProtoStringBuilder.append(String.format("    subgraph cluster%d {\n", clusterIndex++));
+          jobProtoStringBuilder.append(String.format(
+              "    subgraph \"cluster-job%d-%s\" {\n", jobPrototype.getJobID(), taskType.name().toLowerCase()));
           jobProtoStringBuilder.append(String.format("      %s\n", getTaskGraphAttributes(taskType)));
           for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){
             jobProtoStringBuilder.append(String.format("      %s\n", declarationEntry));