You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/03/02 20:05:06 UTC
git commit: CRUNCH-156: Refactor PipelineExecution interface to
provide more info/control over running jobs. Contributed by Chao Shi.
Updated Branches:
refs/heads/master 453bff6fe -> d36b69ab8
CRUNCH-156: Refactor PipelineExecution interface to provide more info/control over running jobs. Contributed by Chao Shi.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d36b69ab
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d36b69ab
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d36b69ab
Branch: refs/heads/master
Commit: d36b69ab8d07daaca3ffdad6082b26acd6d1e49b
Parents: 453bff6
Author: Josh Wills <jw...@apache.org>
Authored: Sat Mar 2 11:04:30 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Mar 2 11:04:30 2013 -0800
----------------------------------------------------------------------
.../it/java/org/apache/crunch/CancelJobsIT.java | 53 +++--
.../java/org/apache/crunch/PipelineExecution.java | 34 +++-
.../lib/jobcontrol/CrunchControlledJob.java | 6 +-
.../org/apache/crunch/impl/mem/MemPipeline.java | 36 +--
.../java/org/apache/crunch/impl/mr/MRPipeline.java | 8 +-
.../org/apache/crunch/impl/mr/exec/MRExecutor.java | 198 +++++++++------
6 files changed, 198 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
index ecdc4e0..ff01a2f 100644
--- a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
@@ -20,7 +20,7 @@ package org.apache.crunch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.util.concurrent.CancellationException;
+import java.io.IOException;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.To;
@@ -36,36 +36,49 @@ public class CancelJobsIT {
@Rule
public TemporaryPath tmpDir = TemporaryPaths.create();
-
+
@Test
public void testRun() throws Exception {
- run(false);
+ PipelineExecution pe = run();
+ pe.waitUntilDone();
+ PipelineResult pr = pe.getResult();
+ assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+ assertEquals(2, pr.getStageResults().size());
}
@Test
- public void testCancel() throws Exception {
- run(true);
+ public void testKill() throws Exception {
+ PipelineExecution pe = run();
+ pe.kill();
+ pe.waitUntilDone();
+ assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+ }
+
+ @Test
+ public void testKillMultipleTimes() throws Exception {
+ PipelineExecution pe = run();
+ for (int i = 0; i < 10; i++) {
+ pe.kill();
+ }
+ pe.waitUntilDone();
+ assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+ }
+
+ @Test
+ public void testKillAfterDone() throws Exception {
+ PipelineExecution pe = run();
+ pe.waitUntilDone();
+ assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+ pe.kill(); // expect no-op
+ assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
}
- public void run(boolean cancel) throws Exception {
+ public PipelineExecution run() throws IOException {
String shakes = tmpDir.copyResourceFileName("shakes.txt");
String out = tmpDir.getFileName("cancel");
Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration());
PCollection<String> words = p.readTextFile(shakes);
p.write(words.count().top(20), To.textFile(out));
- PipelineExecution pe = p.runAsync();
- if (cancel) {
- boolean cancelled = false;
- pe.cancel(true);
- try {
- pe.get();
- } catch (CancellationException e) {
- cancelled = true;
- }
- assertTrue(cancelled);
- } else {
- PipelineResult pr = pe.get();
- assertEquals(2, pr.getStageResults().size());
- }
+ return p.runAsync(); // need to hack to slow down job start up if this test becomes flaky.
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
index e74738f..fc6bb91 100644
--- a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
+++ b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -17,12 +17,38 @@
*/
package org.apache.crunch;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
/**
- * A handle that extends the {@code ListenableFuture} interface to allow clients
- * to control a Crunch pipeline as it runs.
+ * A handle to allow clients to control a Crunch pipeline as it runs.
+ *
+ * This interface is thread-safe.
*/
-public interface PipelineExecution extends ListenableFuture<PipelineResult> {
+public interface PipelineExecution {
+
+ enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
+
+ /** Returns the .dot file that allows a client to graph the Crunch execution plan for this
+ * pipeline.
+ */
String getPlanDotFile();
+
+ /** Blocks until pipeline completes or the specified waiting time elapsed. */
+ void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
+
+ /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */
+ void waitUntilDone() throws InterruptedException;
+
+ Status getStatus();
+
+ /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */
+ PipelineResult getResult();
+
+ /**
+ * Kills the pipeline if it is running, no-op otherwise.
+ *
+ * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return.
+ * To wait for completely exits, use {@link #waitUntilDone()} after this call.
+ */
+ void kill() throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index f30ada3..223673e 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -33,10 +33,10 @@ import org.apache.hadoop.util.StringUtils;
* This class encapsulates a MapReduce job and its dependency. It monitors the
* states of the depending jobs and updates the state of this job. A job starts
* in the WAITING state. If it does not have any depending jobs, or all of the
- * depending jobs are in SUCCESS state, then the job state will become READY. If
+ * depending jobs are in SUCCEEDED state, then the job state will become READY. If
* any depending jobs fail, the job will fail too. When in READY state, the job
* can be submitted to Hadoop for execution, with the state changing into
- * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED
+ * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED
* state, depending the status of the job execution.
*/
public class CrunchControlledJob {
@@ -244,7 +244,7 @@ public class CrunchControlledJob {
/**
* Check the state of this running job. The state may remain the same, become
- * SUCCESS or FAILED.
+ * SUCCEEDED or FAILED.
*/
protected void checkRunningState() throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 9c958a2..d7c1a4f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -20,10 +20,7 @@ package org.apache.crunch.impl.mem;
import java.io.IOException;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -227,48 +224,37 @@ public class MemPipeline implements Pipeline {
public String getPlanDotFile() {
return "";
}
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- lf.addListener(listener, executor);
- }
@Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return lf.cancel(mayInterruptIfRunning);
+ public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ // no-po
}
@Override
- public PipelineResult get() throws InterruptedException, ExecutionException {
- return lf.get();
+ public void waitUntilDone() throws InterruptedException {
+ // no-po
}
@Override
- public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
- return lf.get(timeout, unit);
+ public Status getStatus() {
+ return Status.SUCCEEDED;
}
@Override
- public boolean isCancelled() {
- return lf.isCancelled();
+ public PipelineResult getResult() {
+ return PipelineResult.EMPTY;
}
@Override
- public boolean isDone() {
- return lf.isDone();
+ public void kill() {
}
};
}
@Override
public PipelineResult run() {
- try {
- return runAsync().get();
- } catch (Exception e) {
- LOG.error("Exception running pipeline", e);
- return PipelineResult.EMPTY;
- }
+ activeTargets.clear();
+ return PipelineResult.EMPTY;
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 19110b3..00cf486 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
/**
* Pipeline implementation that is executed within Hadoop MapReduce.
@@ -158,8 +157,11 @@ public class MRPipeline implements Pipeline {
@Override
public PipelineResult run() {
try {
- return runAsync().get();
- } catch (Exception e) {
+ PipelineExecution pipelineExecution = runAsync();
+ pipelineExecution.waitUntilDone();
+ return pipelineExecution.getResult();
+ } catch (InterruptedException e) {
+ // TODO: How to handle this without changing signature?
LOG.error("Exception running pipeline", e);
return PipelineResult.EMPTY;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 791fe28..901f91a 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -20,6 +20,9 @@ package org.apache.crunch.impl.mr.exec;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,21 +36,24 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.materialize.MaterializableIterable;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ListenableFuture;
/**
*
*
*/
-public class MRExecutor {
+public class MRExecutor implements PipelineExecution {
private static final Log LOG = LogFactory.getLog(MRExecutor.class);
private final CrunchJobControl control;
private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
-
+ private final CountDownLatch doneSignal = new CountDownLatch(1);
+ private final CountDownLatch killSignal = new CountDownLatch(1);
+ private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
+ private PipelineResult result;
+ private Thread monitorThread;
+
private String planDotFile;
public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
@@ -55,6 +61,12 @@ public class MRExecutor {
this.control = new CrunchJobControl(jarClass.toString());
this.outputTargets = outputTargets;
this.toMaterialize = toMaterialize;
+ this.monitorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ monitorLoop();
+ }
+ });
}
public void addJob(CrunchJob job) {
@@ -66,95 +78,117 @@ public class MRExecutor {
}
public PipelineExecution execute() {
- FutureImpl fi = new FutureImpl();
- fi.init();
- return fi;
+ monitorThread.start();
+ return this;
}
-
- private class FutureImpl extends AbstractFuture<PipelineResult> implements PipelineExecution {
- @Override
- public String getPlanDotFile() {
- return planDotFile;
- }
-
- public void init() {
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- Thread controlThread = new Thread(control);
- controlThread.start();
- while (!control.allFinished()) {
- Thread.sleep(1000);
- }
- control.stop();
- } catch (InterruptedException e) {
- setException(e);
- return;
- }
-
- List<CrunchControlledJob> failures = control.getFailedJobList();
- if (!failures.isEmpty()) {
- System.err.println(failures.size() + " job failure(s) occurred:");
- for (CrunchControlledJob job : failures) {
- System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
- }
- }
- List<PipelineResult.StageResult> stages = Lists.newArrayList();
- for (CrunchControlledJob job : control.getSuccessfulJobList()) {
- try {
- stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
- } catch (Exception e) {
- LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
- }
+
+ /** Monitors running status. It is called in {@code MonitorThread}. */
+ private void monitorLoop() {
+ try {
+ Thread controlThread = new Thread(control);
+ controlThread.start();
+ while (killSignal.getCount() > 0 && !control.allFinished()) {
+ killSignal.await(1, TimeUnit.SECONDS);
+ }
+ control.stop();
+ killAllRunningJobs();
+
+ List<CrunchControlledJob> failures = control.getFailedJobList();
+ if (!failures.isEmpty()) {
+ System.err.println(failures.size() + " job failure(s) occurred:");
+ for (CrunchControlledJob job : failures) {
+ System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
+ }
+ }
+ List<PipelineResult.StageResult> stages = Lists.newArrayList();
+ for (CrunchControlledJob job : control.getSuccessfulJobList()) {
+ try {
+ stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
+ } catch (Exception e) {
+ LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
+ }
+ }
+
+ for (PCollectionImpl<?> c : outputTargets.keySet()) {
+ if (toMaterialize.containsKey(c)) {
+ MaterializableIterable iter = toMaterialize.get(c);
+ if (iter.isSourceTarget()) {
+ iter.materialize();
+ c.materializeAt((SourceTarget) iter.getSource());
}
-
- for (PCollectionImpl<?> c : outputTargets.keySet()) {
- if (toMaterialize.containsKey(c)) {
- MaterializableIterable iter = toMaterialize.get(c);
- if (iter.isSourceTarget()) {
- iter.materialize();
- c.materializeAt((SourceTarget) iter.getSource());
- }
- } else {
- boolean materialized = false;
- for (Target t : outputTargets.get(c)) {
- if (!materialized) {
- if (t instanceof SourceTarget) {
- c.materializeAt((SourceTarget) t);
- materialized = true;
- } else {
- SourceTarget st = t.asSourceTarget(c.getPType());
- if (st != null) {
- c.materializeAt(st);
- materialized = true;
- }
- }
+ } else {
+ boolean materialized = false;
+ for (Target t : outputTargets.get(c)) {
+ if (!materialized) {
+ if (t instanceof SourceTarget) {
+ c.materializeAt((SourceTarget) t);
+ materialized = true;
+ } else {
+ SourceTarget st = t.asSourceTarget(c.getPType());
+ if (st != null) {
+ c.materializeAt(st);
+ materialized = true;
}
}
}
}
+ }
+ }
- set(new PipelineResult(stages));
+ synchronized (this) {
+ result = new PipelineResult(stages);
+ if (killSignal.getCount() == 0) {
+ status.set(Status.KILLED);
+ } else {
+ status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED);
}
- };
- t.start();
- }
-
- @Override
- public void interruptTask() {
- if (!control.allFinished()) {
- control.stop();
}
- for (CrunchControlledJob job : control.getRunningJobList()) {
- if (!job.isCompleted()) {
- try {
- job.killJob();
- } catch (Exception e) {
- LOG.error("Exception killing job: " + job.getJobName(), e);
- }
+ } catch (InterruptedException e) {
+ throw new AssertionError(e); // Nobody should interrupt us.
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ private void killAllRunningJobs() {
+ for (CrunchControlledJob job : control.getRunningJobList()) {
+ if (!job.isCompleted()) {
+ try {
+ job.killJob();
+ } catch (Exception e) {
+ LOG.error("Exception killing job: " + job.getJobName(), e);
}
}
}
}
+
+ @Override
+ public String getPlanDotFile() {
+ return planDotFile;
+ }
+
+ @Override
+ public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ doneSignal.await(timeout, timeUnit);
+ }
+
+ @Override
+ public void waitUntilDone() throws InterruptedException {
+ doneSignal.await();
+ }
+
+ @Override
+ public synchronized Status getStatus() {
+ return status.get();
+ }
+
+ @Override
+ public synchronized PipelineResult getResult() {
+ return result;
+ }
+
+ @Override
+ public void kill() throws InterruptedException {
+ killSignal.countDown();
+ }
}