You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2014/02/23 10:38:29 UTC
git commit: CRUNCH-353: When job failure is noticed,
MRPipeline kills all its running jobs and exits immediate. This saves
slot-time.
Repository: crunch
Updated Branches:
refs/heads/master 1b8b15315 -> 0da8e3c3e
CRUNCH-353: When job failure is noticed, MRPipeline kills all its running jobs and exits immediate. This saves slot-time.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/0da8e3c3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/0da8e3c3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/0da8e3c3
Branch: refs/heads/master
Commit: 0da8e3c3e5b24ad70c1ab1a64ec6dd21d7fdea11
Parents: 1b8b153
Author: Chao Shi <ch...@apache.org>
Authored: Sat Feb 22 21:42:05 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Sun Feb 23 16:55:07 2014 +0800
----------------------------------------------------------------------
.../crunch/impl/mr/exec/MRExecutorIT.java | 110 +++++++++++++++++++
.../lib/jobcontrol/CrunchJobControl.java | 4 +
.../apache/crunch/impl/mr/exec/MRExecutor.java | 2 +-
3 files changed, 115 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java
new file mode 100644
index 0000000..729a12d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.crunch.types.writable.Writables.longs;
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MRExecutorIT {
+
+ private static class SleepForeverFn extends DoFn<Long, Long> {
+ @Override
+ public void process(Long input, Emitter<Long> emitter) {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ /**
+ * Tests that the pipeline should be stopped immediately when one of the jobs
+ * get failed. The rest of running jobs should be killed.
+ */
+ @Test
+ public void testStopPipelineImmediatelyOnJobFailure() throws Exception {
+ String inPath = tmpDir.copyResourceFileName("shakes.txt");
+ MRPipeline pipeline = new MRPipeline(MRExecutorIT.class);
+
+ // Issue two jobs that sleep forever.
+ PCollection<String> in = pipeline.read(From.textFile(inPath));
+ for (int i = 0; i < 2; i++) {
+ in.count()
+ .values()
+ .parallelDo(new SleepForeverFn(), longs())
+ .write(To.textFile(tmpDir.getPath("out_" + i)));
+ }
+ MRPipelineExecution exec = pipeline.runAsync();
+
+ // Wait until both of the two jobs are submitted.
+ List<MRJob> jobs = exec.getJobs();
+ assertEquals(2, jobs.size());
+ StopWatch watch = new StopWatch();
+ watch.start();
+ int numOfJobsSubmitted = 0;
+ while (numOfJobsSubmitted < 2 && watch.getTime() < 10000) {
+ numOfJobsSubmitted = 0;
+ for (MRJob job : jobs) {
+ if (job.getJobState() == MRJob.State.RUNNING) {
+ numOfJobsSubmitted++;
+ }
+ }
+ Thread.sleep(100);
+ }
+ assertEquals(2, numOfJobsSubmitted);
+
+ // Kill one of them.
+ Job job0 = jobs.get(0).getJob();
+ job0.killJob();
+
+ // Expect the pipeline exits and the other job is killed.
+ StopWatch watch2 = new StopWatch();
+ watch2.start();
+ Job job1 = jobs.get(1).getJob();
+ while (!job1.isComplete() && watch2.getTime() < 10000) {
+ Thread.sleep(100);
+ }
+ assertTrue(job1.isComplete());
+ assertEquals(PipelineExecution.Status.FAILED, exec.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index ce7a6d9..aac2ffa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -221,6 +221,10 @@ public class CrunchJobControl {
&& this.runningJobs.size() == 0;
}
+ synchronized public boolean anyFailures() {
+ return this.failedJobs.size() > 0;
+ }
+
/**
* Checks the states of the running jobs Update the states of waiting jobs, and submits the jobs in
* ready state (i.e. whose dependencies are all finished in success).
http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 1137498..3eba7a1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -108,7 +108,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
private void monitorLoop() {
status.set(Status.RUNNING);
try {
- while (killSignal.getCount() > 0 && !control.allFinished()) {
+ while (killSignal.getCount() > 0 && !control.allFinished() && !control.anyFailures()) {
control.pollJobStatusAndStartNewOnes();
killSignal.await(pollInterval.get(), TimeUnit.MILLISECONDS);
}