You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/12/04 15:27:39 UTC
[1/2] git commit: CRUNCH-307: Limit the number of concurrently
running jobs
Updated Branches:
refs/heads/master 23bad11d6 -> 10bf70489
CRUNCH-307: Limit the number of concurrently running jobs
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/96e39453
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/96e39453
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/96e39453
Branch: refs/heads/master
Commit: 96e3945383d15a5d70b7a6bed2f02fcd0d79e58e
Parents: 2a8b6c1
Author: Chao Shi <ch...@apache.org>
Authored: Wed Dec 4 21:26:19 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Dec 4 21:26:19 2013 +0800
----------------------------------------------------------------------
.../lib/jobcontrol/CrunchJobControl.java | 14 +++-
.../apache/crunch/impl/mr/exec/MRExecutor.java | 7 +-
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 2 +-
.../crunch/impl/mr/run/RuntimeParameters.java | 2 +
.../lib/jobcontrol/CrunchJobControlTest.java | 77 ++++++++++++++++++++
5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 47cfb94..ce7a6d9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.impl.mr.MRJob.State;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
/**
* This class encapsulates a set of MapReduce jobs and its dependency.
@@ -49,6 +51,7 @@ public class CrunchJobControl {
private Log log = LogFactory.getLog(CrunchJobControl.class);
private final String groupName;
+ private final int maxRunningJobs;
/**
* Construct a job control for a group of jobs.
@@ -56,13 +59,14 @@ public class CrunchJobControl {
* @param groupName
* a name identifying this group
*/
- public CrunchJobControl(String groupName) {
+ public CrunchJobControl(Configuration conf, String groupName) {
this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>();
this.failedJobs = new Hashtable<Integer, CrunchControlledJob>();
this.groupName = groupName;
+ this.maxRunningJobs = conf.getInt(RuntimeParameters.MAX_RUNNING_JOBS, 5);
}
private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) {
@@ -190,8 +194,12 @@ public class CrunchJobControl {
this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
for (CrunchControlledJob nextJob : oldJobs.values()) {
- // Submitting Job to Hadoop
- nextJob.submit();
+ // Limit the number of concurrent running jobs. If we have reached such limit,
+ // stop submitting new jobs and wait until some running job completes.
+ if (runningJobs.size() < maxRunningJobs) {
+ // Submitting Job to Hadoop
+ nextJob.submit();
+ }
this.addToQueue(nextJob);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index a655b23..38344a2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -69,9 +69,12 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
private String planDotFile;
- public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+ public MRExecutor(
+ Configuration conf,
+ Class<?> jarClass,
+ Map<PCollectionImpl<?>, Set<Target>> outputTargets,
Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
- this.control = new CrunchJobControl(jarClass.toString());
+ this.control = new CrunchJobControl(conf, jarClass.toString());
this.outputTargets = outputTargets;
this.toMaterialize = toMaterialize;
this.monitorThread = new Thread(new Runnable() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index ac61fec..96c9125 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -159,7 +159,7 @@ public class MSCRPlanner {
// Finally, construct the jobs from the prototypes and return.
DotfileWriter dotfileWriter = new DotfileWriter();
- MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
+ MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize);
for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
dotfileWriter.addJobPrototype(proto);
exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID));
http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 987ccd3..0c9f229 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -36,6 +36,8 @@ public final class RuntimeParameters {
public static final String DISABLE_DEEP_COPY = "crunch.disable.deep.copy";
+ public static final String MAX_RUNNING_JOBS = "crunch.max.running.jobs";
+
// Not instantiated
private RuntimeParameters() {
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
new file mode 100644
index 0000000..562e99d
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CrunchJobControlTest {
+ @Test
+ public void testMaxRunningJobs() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ conf.setInt(RuntimeParameters.MAX_RUNNING_JOBS, 2);
+ CrunchJobControl jobControl = new CrunchJobControl(conf, "group");
+ CrunchControlledJob job1 = createJob(1);
+ CrunchControlledJob job2 = createJob(2);
+ CrunchControlledJob job3 = createJob(3);
+
+ // Submit job1 and job2.
+ jobControl.addJob(job1);
+ jobControl.addJob(job2);
+ jobControl.pollJobStatusAndStartNewOnes();
+ verify(job1).submit();
+ verify(job2).submit();
+
+ // Add job3 and expect it is pending.
+ jobControl.addJob(job3);
+ jobControl.pollJobStatusAndStartNewOnes();
+ verify(job3, never()).submit();
+
+ // Expect job3 is submitted after job1 is done.
+ setSuccess(job1);
+ jobControl.pollJobStatusAndStartNewOnes();
+ verify(job3).submit();
+ }
+
+ private CrunchControlledJob createJob(int jobID) throws IOException, InterruptedException {
+ Job mrJob = mock(Job.class);
+ when(mrJob.getConfiguration()).thenReturn(new Configuration());
+ CrunchControlledJob job = new CrunchControlledJob(
+ jobID,
+ mrJob,
+ mock(CrunchControlledJob.Hook.class),
+ mock(CrunchControlledJob.Hook.class));
+ return spy(job);
+ }
+
+ private void setSuccess(CrunchControlledJob job) throws IOException, InterruptedException {
+ when(job.checkState()).thenReturn(MRJob.State.SUCCESS);
+ when(job.getJobState()).thenReturn(MRJob.State.SUCCESS);
+ }
+}
[2/2] git commit: Merge branch 'crunch-307'
Posted by ch...@apache.org.
Merge branch 'crunch-307'
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/10bf7048
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/10bf7048
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/10bf7048
Branch: refs/heads/master
Commit: 10bf70489b6ad132acc09085bc05d17cfefe5e73
Parents: 23bad11 96e3945
Author: Chao Shi <ch...@apache.org>
Authored: Wed Dec 4 22:26:13 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Dec 4 22:26:13 2013 +0800
----------------------------------------------------------------------
.../lib/jobcontrol/CrunchJobControl.java | 14 +++-
.../apache/crunch/impl/mr/exec/MRExecutor.java | 7 +-
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 2 +-
.../crunch/impl/mr/run/RuntimeParameters.java | 2 +
.../lib/jobcontrol/CrunchJobControlTest.java | 77 ++++++++++++++++++++
5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------