You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/12/04 15:27:39 UTC

[1/2] git commit: CRUNCH-307: Limit the number of concurrently running jobs

Updated Branches:
  refs/heads/master 23bad11d6 -> 10bf70489


CRUNCH-307: Limit the number of concurrently running jobs


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/96e39453
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/96e39453
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/96e39453

Branch: refs/heads/master
Commit: 96e3945383d15a5d70b7a6bed2f02fcd0d79e58e
Parents: 2a8b6c1
Author: Chao Shi <ch...@apache.org>
Authored: Wed Dec 4 21:26:19 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Dec 4 21:26:19 2013 +0800

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchJobControl.java        | 14 +++-
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  7 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  2 +-
 .../crunch/impl/mr/run/RuntimeParameters.java   |  2 +
 .../lib/jobcontrol/CrunchJobControlTest.java    | 77 ++++++++++++++++++++
 5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 47cfb94..ce7a6d9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.impl.mr.MRJob.State;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * This class encapsulates a set of MapReduce jobs and its dependency.
@@ -49,6 +51,7 @@ public class CrunchJobControl {
   private Log log = LogFactory.getLog(CrunchJobControl.class);
 
   private final String groupName;
+  private final int maxRunningJobs;
 
   /**
    * Construct a job control for a group of jobs.
@@ -56,13 +59,14 @@ public class CrunchJobControl {
    * @param groupName
    *          a name identifying this group
    */
-  public CrunchJobControl(String groupName) {
+  public CrunchJobControl(Configuration conf, String groupName) {
     this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.failedJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.groupName = groupName;
+    this.maxRunningJobs = conf.getInt(RuntimeParameters.MAX_RUNNING_JOBS, 5);
   }
 
   private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) {
@@ -190,8 +194,12 @@ public class CrunchJobControl {
     this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
 
     for (CrunchControlledJob nextJob : oldJobs.values()) {
-      // Submitting Job to Hadoop
-      nextJob.submit();
+      // Limit the number of concurrent running jobs. If we have reached such limit,
+      // stop submitting new jobs and wait until some running job completes.
+      if (runningJobs.size() < maxRunningJobs) {
+        // Submitting Job to Hadoop
+        nextJob.submit();
+      }
       this.addToQueue(nextJob);
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index a655b23..38344a2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -69,9 +69,12 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
 
   private String planDotFile;
   
-  public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+  public MRExecutor(
+      Configuration conf,
+      Class<?> jarClass,
+      Map<PCollectionImpl<?>, Set<Target>> outputTargets,
       Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
-    this.control = new CrunchJobControl(jarClass.toString());
+    this.control = new CrunchJobControl(conf, jarClass.toString());
     this.outputTargets = outputTargets;
     this.toMaterialize = toMaterialize;
     this.monitorThread = new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index ac61fec..96c9125 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -159,7 +159,7 @@ public class MSCRPlanner {
     
     // Finally, construct the jobs from the prototypes and return.
     DotfileWriter dotfileWriter = new DotfileWriter();
-    MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
+    MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize);
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
       dotfileWriter.addJobPrototype(proto);
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID));

http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 987ccd3..0c9f229 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -36,6 +36,8 @@ public final class RuntimeParameters {
 
   public static final String DISABLE_DEEP_COPY = "crunch.disable.deep.copy";
 
+  public static final String MAX_RUNNING_JOBS = "crunch.max.running.jobs";
+
   // Not instantiated
   private RuntimeParameters() {
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
new file mode 100644
index 0000000..562e99d
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CrunchJobControlTest {
+  @Test
+  public void testMaxRunningJobs() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setInt(RuntimeParameters.MAX_RUNNING_JOBS, 2);
+    CrunchJobControl jobControl = new CrunchJobControl(conf, "group");
+    CrunchControlledJob job1 = createJob(1);
+    CrunchControlledJob job2 = createJob(2);
+    CrunchControlledJob job3 = createJob(3);
+
+    // Submit job1 and job2.
+    jobControl.addJob(job1);
+    jobControl.addJob(job2);
+    jobControl.pollJobStatusAndStartNewOnes();
+    verify(job1).submit();
+    verify(job2).submit();
+
+    // Add job3 and expect it is pending.
+    jobControl.addJob(job3);
+    jobControl.pollJobStatusAndStartNewOnes();
+    verify(job3, never()).submit();
+
+    // Expect job3 is submitted after job1 is done.
+    setSuccess(job1);
+    jobControl.pollJobStatusAndStartNewOnes();
+    verify(job3).submit();
+  }
+
+  private CrunchControlledJob createJob(int jobID) throws IOException, InterruptedException {
+    Job mrJob = mock(Job.class);
+    when(mrJob.getConfiguration()).thenReturn(new Configuration());
+    CrunchControlledJob job = new CrunchControlledJob(
+        jobID,
+        mrJob,
+        mock(CrunchControlledJob.Hook.class),
+        mock(CrunchControlledJob.Hook.class));
+    return spy(job);
+  }
+
+  private void setSuccess(CrunchControlledJob job) throws IOException, InterruptedException {
+    when(job.checkState()).thenReturn(MRJob.State.SUCCESS);
+    when(job.getJobState()).thenReturn(MRJob.State.SUCCESS);
+  }
+}


[2/2] git commit: Merge branch 'crunch-307'

Posted by ch...@apache.org.
Merge branch 'crunch-307'


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/10bf7048
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/10bf7048
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/10bf7048

Branch: refs/heads/master
Commit: 10bf70489b6ad132acc09085bc05d17cfefe5e73
Parents: 23bad11 96e3945
Author: Chao Shi <ch...@apache.org>
Authored: Wed Dec 4 22:26:13 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Dec 4 22:26:13 2013 +0800

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchJobControl.java        | 14 +++-
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  7 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  2 +-
 .../crunch/impl/mr/run/RuntimeParameters.java   |  2 +
 .../lib/jobcontrol/CrunchJobControlTest.java    | 77 ++++++++++++++++++++
 5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------