You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/06 19:40:11 UTC
git commit: CRUNCH-273: Make PipelineExecution implement
ListenableFuture
Updated Branches:
refs/heads/master ecf5dd01c -> 7a8af2865
CRUNCH-273: Make PipelineExecution implement ListenableFuture
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7a8af286
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7a8af286
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7a8af286
Branch: refs/heads/master
Commit: 7a8af286529aeacb07bd7bc6daf313d5e3f31b49
Parents: ecf5dd0
Author: Josh Wills <jw...@apache.org>
Authored: Thu Oct 3 00:31:58 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 6 10:37:24 2013 -0700
----------------------------------------------------------------------
.../it/java/org/apache/crunch/CancelJobsIT.java | 36 +++++++-
.../org/apache/crunch/PipelineExecution.java | 4 +-
.../org/apache/crunch/impl/mem/MemPipeline.java | 97 +++++++++++++-------
.../apache/crunch/impl/mr/exec/MRExecutor.java | 42 ++++++++-
4 files changed, 140 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
index ff01a2f..5fbca57 100644
--- a/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
@@ -18,6 +18,7 @@
package org.apache.crunch;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -45,7 +46,15 @@ public class CancelJobsIT {
assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
assertEquals(2, pr.getStageResults().size());
}
-
+
+ @Test
+ public void testGet() throws Exception {
+ PipelineExecution pe = run();
+ PipelineResult pr = pe.get();
+ assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+ assertEquals(2, pr.getStageResults().size());
+ }
+
@Test
public void testKill() throws Exception {
PipelineExecution pe = run();
@@ -55,6 +64,31 @@ public class CancelJobsIT {
}
@Test
+ public void testKillGet() throws Exception {
+ PipelineExecution pe = run();
+ pe.kill();
+ PipelineResult res = pe.get();
+ assertFalse(res.succeeded());
+ assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+ }
+
+ @Test
+ public void testCancelNoInterrupt() throws Exception {
+ PipelineExecution pe = run();
+ pe.cancel(false);
+ pe.waitUntilDone();
+ assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+ }
+
+ @Test
+ public void testCancelMayInterrupt() throws Exception {
+ PipelineExecution pe = run();
+ pe.cancel(true);
+ pe.waitUntilDone();
+ assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+ }
+
+ @Test
public void testKillMultipleTimes() throws Exception {
PipelineExecution pe = run();
for (int i = 0; i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
index fc6bb91..af6a177 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -17,6 +17,8 @@
*/
package org.apache.crunch;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.concurrent.TimeUnit;
/**
@@ -24,7 +26,7 @@ import java.util.concurrent.TimeUnit;
*
* This interface is thread-safe.
*/
-public interface PipelineExecution {
+public interface PipelineExecution extends ListenableFuture<PipelineResult> {
enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 60677fc..cc9ad69 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -20,8 +20,11 @@ package org.apache.crunch.impl.mem;
import java.io.IOException;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import com.google.common.util.concurrent.AbstractFuture;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericContainer;
@@ -306,44 +309,16 @@ public class MemPipeline implements Pipeline {
@Override
public PipelineExecution runAsync() {
activeTargets.clear();
- return new PipelineExecution() {
- @Override
- public String getPlanDotFile() {
- return "";
- }
-
- @Override
- public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
- // no-po
- }
-
- @Override
- public void waitUntilDone() throws InterruptedException {
- // no-po
- }
-
- @Override
- public Status getStatus() {
- return Status.SUCCEEDED;
- }
-
- @Override
- public PipelineResult getResult() {
- return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)),
- Status.SUCCEEDED);
- }
-
- @Override
- public void kill() {
- }
- };
+ return new MemExecution();
}
@Override
public PipelineResult run() {
- activeTargets.clear();
- return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)),
- PipelineExecution.Status.SUCCEEDED);
+ try {
+ return runAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -360,4 +335,58 @@ public class MemPipeline implements Pipeline {
public String getName() {
return "Memory Pipeline";
}
+
+ private static class MemExecution extends AbstractFuture<PipelineResult> implements PipelineExecution {
+
+ private PipelineResult res;
+
+ public MemExecution() {
+ this.res = new PipelineResult(
+ ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)),
+ PipelineExecution.Status.SUCCEEDED);
+ }
+
+ @Override
+ public String getPlanDotFile() {
+ return "";
+ }
+
+ @Override
+ public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ set(res);
+ }
+
+ @Override
+ public void waitUntilDone() throws InterruptedException {
+ set(res);
+ }
+
+ @Override
+ public PipelineResult get() throws ExecutionException, InterruptedException {
+ set(res);
+ return super.get();
+ }
+
+ @Override
+ public PipelineResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ set(res);
+ return super.get(timeout, timeUnit);
+ }
+
+ @Override
+ public Status getStatus() {
+ return isDone() ? Status.SUCCEEDED : Status.READY;
+ }
+
+ @Override
+ public PipelineResult getResult() {
+ return isDone() ? res : null;
+ }
+
+ @Override
+ public void kill() {
+ // No-op
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 532e37c..a655b23 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.mr.exec;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.PipelineResult;
@@ -37,7 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -49,7 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
*
* It is thread-safe.
*/
-public class MRExecutor implements MRPipelineExecution {
+public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipelineExecution {
private static final Log LOG = LogFactory.getLog(MRExecutor.class);
@@ -62,6 +65,7 @@ public class MRExecutor implements MRPipelineExecution {
private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
private PipelineResult result;
private Thread monitorThread;
+ private boolean started;
private String planDotFile;
@@ -89,13 +93,17 @@ public class MRExecutor implements MRPipelineExecution {
this.planDotFile = planDotFile;
}
- public MRPipelineExecution execute() {
- monitorThread.start();
+ public synchronized MRPipelineExecution execute() {
+ if (!started) {
+ monitorThread.start();
+ started = true;
+ }
return this;
}
/** Monitors running status. It is called in {@code MonitorThread}. */
private void monitorLoop() {
+ status.set(Status.RUNNING);
try {
while (killSignal.getCount() > 0 && !control.allFinished()) {
control.pollJobStatusAndStartNewOnes();
@@ -150,12 +158,14 @@ public class MRExecutor implements MRPipelineExecution {
status.set(Status.SUCCEEDED);
}
result = new PipelineResult(stages, status.get());
+ set(result);
}
} catch (InterruptedException e) {
throw new AssertionError(e); // Nobody should interrupt us.
} catch (IOException e) {
LOG.error("Pipeline failed due to exception", e);
status.set(Status.FAILED);
+ setException(e);
} finally {
doneSignal.countDown();
}
@@ -177,6 +187,23 @@ public class MRExecutor implements MRPipelineExecution {
}
@Override
+ public PipelineResult get() throws InterruptedException, ExecutionException {
+ if (getStatus() == Status.READY) {
+ execute();
+ }
+ return super.get();
+ }
+
+ @Override
+ public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
+ ExecutionException {
+ if (getStatus() == Status.READY) {
+ execute();
+ }
+ return super.get(timeout, unit);
+ }
+
+ @Override
public synchronized Status getStatus() {
return status.get();
}
@@ -191,6 +218,15 @@ public class MRExecutor implements MRPipelineExecution {
killSignal.countDown();
}
+ @Override
+ protected void interruptTask() {
+ try {
+ kill();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static boolean isLocalMode() {
Configuration conf = new Configuration();
// Try to handle MapReduce version 0.20 or 0.22