You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/02/27 21:19:50 UTC
git commit: CRUNCH-156: Add a runAsync option to Pipeline that
returns a PipelineExecution interface that extends ListenableFuture and
allows client's to interactively control running pipelines.
Updated Branches:
refs/heads/master c0564f181 -> 236d97dbb
CRUNCH-156: Add a runAsync option to Pipeline that returns a PipelineExecution interface
that extends ListenableFuture and allows client's to interactively control running pipelines.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/236d97db
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/236d97db
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/236d97db
Branch: refs/heads/master
Commit: 236d97dbb249b3c4b98ec893893e2d2c591d0ae0
Parents: c0564f1
Author: Josh Wills <jw...@apache.org>
Authored: Tue Feb 26 20:14:49 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Feb 27 11:54:19 2013 -0800
----------------------------------------------------------------------
.../it/java/org/apache/crunch/CancelJobsIT.java | 71 ++++++++
.../src/main/java/org/apache/crunch/Pipeline.java | 10 +
.../java/org/apache/crunch/PipelineExecution.java | 28 +++
.../lib/jobcontrol/CrunchControlledJob.java | 1 -
.../org/apache/crunch/impl/mem/MemPipeline.java | 58 ++++++-
.../java/org/apache/crunch/impl/mr/MRPipeline.java | 49 ++----
.../org/apache/crunch/impl/mr/exec/MRExecutor.java | 134 ++++++++++++---
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 14 +-
8 files changed, 302 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
new file mode 100644
index 0000000..ecdc4e0
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CancellationException;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CancelJobsIT {
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testRun() throws Exception {
+ run(false);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ run(true);
+ }
+
+ public void run(boolean cancel) throws Exception {
+ String shakes = tmpDir.copyResourceFileName("shakes.txt");
+ String out = tmpDir.getFileName("cancel");
+ Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> words = p.readTextFile(shakes);
+ p.write(words.count().top(20), To.textFile(out));
+ PipelineExecution pe = p.runAsync();
+ if (cancel) {
+ boolean cancelled = false;
+ pe.cancel(true);
+ try {
+ pe.get();
+ } catch (CancellationException e) {
+ cancelled = true;
+ }
+ assertTrue(cancelled);
+ } else {
+ PipelineResult pr = pe.get();
+ assertEquals(2, pr.getStageResults().size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
index af1d86a..9540eac 100644
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java
@@ -19,6 +19,8 @@ package org.apache.crunch;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.util.concurrent.ListenableFuture;
+
/**
* Manages the state of a pipeline execution.
*
@@ -107,6 +109,14 @@ public interface Pipeline {
PipelineResult run();
/**
+ * Constructs and starts a series of MapReduce jobs in order ot write data to
+ * the output targets, but returns a {@code ListenableFuture} to allow clients to control
+ * job execution.
+ * @return
+ */
+ PipelineExecution runAsync();
+
+ /**
* Run any remaining jobs required to generate outputs and then clean up any
* intermediate data files that were created in this run or previous calls to
* {@code run}.
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
new file mode 100644
index 0000000..e74738f
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A handle that extends the {@code ListenableFuture} interface to allow clients
+ * to control a Crunch pipeline as it runs.
+ */
+public interface PipelineExecution extends ListenableFuture<PipelineResult> {
+ String getPlanDotFile();
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 396ea2d..f30ada3 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 488cdd9..9c958a2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -20,6 +20,10 @@ package org.apache.crunch.impl.mem;
import java.io.IOException;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,6 +32,7 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.TableSource;
@@ -49,6 +54,8 @@ import org.apache.hadoop.mapreduce.Counters;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
public class MemPipeline implements Pipeline {
@@ -212,9 +219,56 @@ public class MemPipeline implements Pipeline {
}
@Override
- public PipelineResult run() {
+ public PipelineExecution runAsync() {
activeTargets.clear();
- return PipelineResult.EMPTY;
+ final ListenableFuture<PipelineResult> lf = Futures.immediateFuture(PipelineResult.EMPTY);
+ return new PipelineExecution() {
+ @Override
+ public String getPlanDotFile() {
+ return "";
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ lf.addListener(listener, executor);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return lf.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public PipelineResult get() throws InterruptedException, ExecutionException {
+ return lf.get();
+ }
+
+ @Override
+ public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ return lf.get(timeout, unit);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return lf.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return lf.isDone();
+ }
+ };
+ }
+
+ @Override
+ public PipelineResult run() {
+ try {
+ return runAsync().get();
+ } catch (Exception e) {
+ LOG.error("Exception running pipeline", e);
+ return PipelineResult.EMPTY;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 2d4d137..19110b3 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -29,6 +29,7 @@ import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
@@ -58,6 +59,7 @@ import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Pipeline implementation that is executed within Hadoop MapReduce.
@@ -138,7 +140,14 @@ public class MRPipeline implements Pipeline {
}
public MRExecutor plan() {
- MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
+ Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize = Maps.newHashMap();
+ for (PCollectionImpl<?> c : outputTargets.keySet()) {
+ if (outputTargetsToMaterialize.containsKey(c)) {
+ toMaterialize.put(c, outputTargetsToMaterialize.get(c));
+ outputTargetsToMaterialize.remove(c);
+ }
+ }
+ MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize);
try {
return planner.plan(jarClass, conf);
} catch (IOException e) {
@@ -148,39 +157,17 @@ public class MRPipeline implements Pipeline {
@Override
public PipelineResult run() {
- PipelineResult res = null;
try {
- res = plan().execute();
- } catch (CrunchRuntimeException e) {
- LOG.error(e);
+ return runAsync().get();
+ } catch (Exception e) {
+ LOG.error("Exception running pipeline", e);
return PipelineResult.EMPTY;
}
- for (PCollectionImpl<?> c : outputTargets.keySet()) {
- if (outputTargetsToMaterialize.containsKey(c)) {
- MaterializableIterable iter = outputTargetsToMaterialize.get(c);
- if (iter.isSourceTarget()) {
- iter.materialize();
- c.materializeAt((SourceTarget) iter.getSource());
- }
- outputTargetsToMaterialize.remove(c);
- } else {
- boolean materialized = false;
- for (Target t : outputTargets.get(c)) {
- if (!materialized) {
- if (t instanceof SourceTarget) {
- c.materializeAt((SourceTarget) t);
- materialized = true;
- } else {
- SourceTarget st = t.asSourceTarget(c.getPType());
- if (st != null) {
- c.materializeAt(st);
- materialized = true;
- }
- }
- }
- }
- }
- }
+ }
+
+ @Override
+ public PipelineExecution runAsync() {
+ PipelineExecution res = plan().execute();
outputTargets.clear();
return res;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 9811600..791fe28 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -18,14 +18,23 @@
package org.apache.crunch.impl.mr.exec;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.materialize.MaterializableIterable;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
/**
*
@@ -36,41 +45,116 @@ public class MRExecutor {
private static final Log LOG = LogFactory.getLog(MRExecutor.class);
private final CrunchJobControl control;
-
- public MRExecutor(Class<?> jarClass) {
+ private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+ private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+
+ private String planDotFile;
+
+ public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+ Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
this.control = new CrunchJobControl(jarClass.toString());
+ this.outputTargets = outputTargets;
+ this.toMaterialize = toMaterialize;
}
public void addJob(CrunchJob job) {
this.control.addJob(job);
}
- public PipelineResult execute() {
- try {
- Thread controlThread = new Thread(control);
- controlThread.start();
- while (!control.allFinished()) {
- Thread.sleep(1000);
- }
- control.stop();
- } catch (InterruptedException e) {
- LOG.info(e);
+ public void setPlanDotFile(String planDotFile) {
+ this.planDotFile = planDotFile;
+ }
+
+ public PipelineExecution execute() {
+ FutureImpl fi = new FutureImpl();
+ fi.init();
+ return fi;
+ }
+
+ private class FutureImpl extends AbstractFuture<PipelineResult> implements PipelineExecution {
+ @Override
+ public String getPlanDotFile() {
+ return planDotFile;
}
- List<CrunchControlledJob> failures = control.getFailedJobList();
- if (!failures.isEmpty()) {
- System.err.println(failures.size() + " job failure(s) occurred:");
- for (CrunchControlledJob job : failures) {
- System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
- }
+
+ public void init() {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread controlThread = new Thread(control);
+ controlThread.start();
+ while (!control.allFinished()) {
+ Thread.sleep(1000);
+ }
+ control.stop();
+ } catch (InterruptedException e) {
+ setException(e);
+ return;
+ }
+
+ List<CrunchControlledJob> failures = control.getFailedJobList();
+ if (!failures.isEmpty()) {
+ System.err.println(failures.size() + " job failure(s) occurred:");
+ for (CrunchControlledJob job : failures) {
+ System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
+ }
+ }
+ List<PipelineResult.StageResult> stages = Lists.newArrayList();
+ for (CrunchControlledJob job : control.getSuccessfulJobList()) {
+ try {
+ stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
+ } catch (Exception e) {
+ LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
+ }
+ }
+
+ for (PCollectionImpl<?> c : outputTargets.keySet()) {
+ if (toMaterialize.containsKey(c)) {
+ MaterializableIterable iter = toMaterialize.get(c);
+ if (iter.isSourceTarget()) {
+ iter.materialize();
+ c.materializeAt((SourceTarget) iter.getSource());
+ }
+ } else {
+ boolean materialized = false;
+ for (Target t : outputTargets.get(c)) {
+ if (!materialized) {
+ if (t instanceof SourceTarget) {
+ c.materializeAt((SourceTarget) t);
+ materialized = true;
+ } else {
+ SourceTarget st = t.asSourceTarget(c.getPType());
+ if (st != null) {
+ c.materializeAt(st);
+ materialized = true;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ set(new PipelineResult(stages));
+ }
+ };
+ t.start();
}
- List<PipelineResult.StageResult> stages = Lists.newArrayList();
- for (CrunchControlledJob job : control.getSuccessfulJobList()) {
- try {
- stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
- } catch (Exception e) {
- LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
+
+ @Override
+ public void interruptTask() {
+ if (!control.allFinished()) {
+ control.stop();
+ }
+ for (CrunchControlledJob job : control.getRunningJobList()) {
+ if (!job.isCompleted()) {
+ try {
+ job.killJob();
+ } catch (Exception e) {
+ LOG.error("Exception killing job: " + job.getJobName(), e);
+ }
+ }
}
}
- return new PipelineResult(stages);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 472505b..146bcbf 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -32,6 +32,7 @@ import org.apache.crunch.impl.mr.collect.InputCollection;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.HashMultimap;
@@ -44,11 +45,14 @@ public class MSCRPlanner {
private final MRPipeline pipeline;
private final Map<PCollectionImpl<?>, Set<Target>> outputs;
-
- public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) {
+ private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+
+ public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
+ Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
this.pipeline = pipeline;
this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
this.outputs.putAll(outputs);
+ this.toMaterialize = toMaterialize;
}
// Used to ensure that we always build pipelines starting from the deepest
@@ -148,13 +152,15 @@ public class MSCRPlanner {
// Finally, construct the jobs from the prototypes and return.
DotfileWriter dotfileWriter = new DotfileWriter();
- MRExecutor exec = new MRExecutor(jarClass);
+ MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
dotfileWriter.addJobPrototype(proto);
exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
}
- conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, dotfileWriter.buildDotfile());
+ String planDotFile = dotfileWriter.buildDotfile();
+ exec.setPlanDotFile(planDotFile);
+ conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
return exec;
}