You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2014/02/23 10:38:29 UTC

git commit: CRUNCH-353: When job failure is noticed, MRPipeline kills all its running jobs and exits immediate. This saves slot-time.

Repository: crunch
Updated Branches:
  refs/heads/master 1b8b15315 -> 0da8e3c3e


CRUNCH-353: When job failure is noticed, MRPipeline kills all its running jobs and exits immediate. This saves slot-time.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/0da8e3c3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/0da8e3c3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/0da8e3c3

Branch: refs/heads/master
Commit: 0da8e3c3e5b24ad70c1ab1a64ec6dd21d7fdea11
Parents: 1b8b153
Author: Chao Shi <ch...@apache.org>
Authored: Sat Feb 22 21:42:05 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Sun Feb 23 16:55:07 2014 +0800

----------------------------------------------------------------------
 .../crunch/impl/mr/exec/MRExecutorIT.java       | 110 +++++++++++++++++++
 .../lib/jobcontrol/CrunchJobControl.java        |   4 +
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |   2 +-
 3 files changed, 115 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java
new file mode 100644
index 0000000..729a12d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.crunch.types.writable.Writables.longs;
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MRExecutorIT {
+
+  private static class SleepForeverFn extends DoFn<Long, Long> {
+    @Override
+    public void process(Long input, Emitter<Long> emitter) {
+      try {
+        Thread.sleep(Long.MAX_VALUE);
+      } catch (InterruptedException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  /**
+   * Tests that the pipeline should be stopped immediately when one of the jobs
+   * get failed. The rest of running jobs should be killed.
+   */
+  @Test
+  public void testStopPipelineImmediatelyOnJobFailure() throws Exception {
+    String inPath = tmpDir.copyResourceFileName("shakes.txt");
+    MRPipeline pipeline = new MRPipeline(MRExecutorIT.class);
+
+    // Issue two jobs that sleep forever.
+    PCollection<String> in = pipeline.read(From.textFile(inPath));
+    for (int i = 0; i < 2; i++) {
+      in.count()
+          .values()
+          .parallelDo(new SleepForeverFn(), longs())
+          .write(To.textFile(tmpDir.getPath("out_" + i)));
+    }
+    MRPipelineExecution exec = pipeline.runAsync();
+
+    // Wait until both of the two jobs are submitted.
+    List<MRJob> jobs = exec.getJobs();
+    assertEquals(2, jobs.size());
+    StopWatch watch = new StopWatch();
+    watch.start();
+    int numOfJobsSubmitted = 0;
+    while (numOfJobsSubmitted < 2 && watch.getTime() < 10000) {
+      numOfJobsSubmitted = 0;
+      for (MRJob job : jobs) {
+        if (job.getJobState() == MRJob.State.RUNNING) {
+          numOfJobsSubmitted++;
+        }
+      }
+      Thread.sleep(100);
+    }
+    assertEquals(2, numOfJobsSubmitted);
+
+    // Kill one of them.
+    Job job0 = jobs.get(0).getJob();
+    job0.killJob();
+
+    // Expect the pipeline exits and the other job is killed.
+    StopWatch watch2 = new StopWatch();
+    watch2.start();
+    Job job1 = jobs.get(1).getJob();
+    while (!job1.isComplete() && watch2.getTime() < 10000) {
+      Thread.sleep(100);
+    }
+    assertTrue(job1.isComplete());
+    assertEquals(PipelineExecution.Status.FAILED, exec.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index ce7a6d9..aac2ffa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -221,6 +221,10 @@ public class CrunchJobControl {
         && this.runningJobs.size() == 0;
   }
 
+  synchronized public boolean anyFailures() {
+    return this.failedJobs.size() > 0;
+  }
+
   /**
    * Checks the states of the running jobs Update the states of waiting jobs, and submits the jobs in
    * ready state (i.e. whose dependencies are all finished in success).

http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 1137498..3eba7a1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -108,7 +108,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
   private void monitorLoop() {
     status.set(Status.RUNNING);
     try {
-      while (killSignal.getCount() > 0 && !control.allFinished()) {
+      while (killSignal.getCount() > 0 && !control.allFinished() && !control.anyFailures()) {
         control.pollJobStatusAndStartNewOnes();
         killSignal.await(pollInterval.get(), TimeUnit.MILLISECONDS);
       }