You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/15 01:08:36 UTC
[2/3] incubator-beam git commit: Stop Catching Errors in the
DirectRunner
Stop Catching Errors in the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e51c840
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e51c840
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e51c840
Branch: refs/heads/master
Commit: 5e51c84003c2c9e03d51f94cbc2be07569bf090e
Parents: c2c650a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 14 10:32:14 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Oct 14 17:21:18 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CompletionCallback.java | 4 +-
.../beam/runners/direct/DirectRunner.java | 14 ++-----
.../direct/ExecutorServiceParallelExecutor.java | 40 ++++++++++----------
.../beam/runners/direct/PipelineExecutor.java | 2 +-
.../beam/runners/direct/TransformExecutor.java | 10 ++---
.../runners/direct/TransformExecutorTest.java | 16 ++++----
6 files changed, 39 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 8e51d6f..2986df1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -38,7 +38,7 @@ interface CompletionCallback {
void handleEmpty(AppliedPTransform<?, ?, ?> transform);
/**
- * Handle a result that terminated abnormally due to the provided {@link Throwable}.
+ * Handle a result that terminated abnormally due to the provided {@link Exception}.
*/
- void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
+ void handleException(CommittedBundle<?> inputBundle, Exception t);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 6ef2472..664a915 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -403,18 +403,10 @@ public class DirectRunner
*
* <p>See also {@link PipelineExecutor#awaitCompletion()}.
*/
- public State awaitCompletion() throws Throwable {
+ public State awaitCompletion() throws Exception {
if (!state.isTerminal()) {
- try {
- executor.awaitCompletion();
- state = State.DONE;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw e;
- } catch (Throwable t) {
- state = State.FAILED;
- throw t;
- }
+ executor.awaitCompletion();
+ state = State.DONE;
}
return state;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 3274524..e32f671 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -234,7 +234,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
@Override
- public void awaitCompletion() throws Throwable {
+ public void awaitCompletion() throws Exception {
VisibleExecutorUpdate update;
do {
// Get an update; don't block forever if another thread has handled it
@@ -243,8 +243,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
// there are no updates to process and no updates will ever be published because the
// executor is shutdown
return;
- } else if (update != null && update.throwable.isPresent()) {
- throw update.throwable.get();
+ } else if (update != null && update.exception.isPresent()) {
+ throw update.exception.get();
}
} while (update == null || !update.isDone());
executorService.shutdown();
@@ -253,7 +253,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
/**
* The base implementation of {@link CompletionCallback} that provides implementations for
* {@link #handleResult(CommittedBundle, TransformResult)} and
- * {@link #handleThrowable(CommittedBundle, Throwable)}.
+ * {@link #handleException(CommittedBundle, Exception)}.
*/
private class TimerIterableCompletionCallback implements CompletionCallback {
private final Iterable<TimerData> timers;
@@ -296,8 +296,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
@Override
- public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
- allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+ public final void handleException(CommittedBundle<?> inputBundle, Exception e) {
+ allUpdates.offer(ExecutorUpdate.fromException(e));
outstandingWork.decrementAndGet();
}
}
@@ -315,14 +315,14 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
Optional.of(bundle),
consumers,
- Optional.<Throwable>absent());
+ Optional.<Exception>absent());
}
- public static ExecutorUpdate fromThrowable(Throwable t) {
+ public static ExecutorUpdate fromException(Exception e) {
return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
Optional.<CommittedBundle<?>>absent(),
Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
- Optional.of(t));
+ Optional.of(e));
}
/**
@@ -336,7 +336,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
*/
public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
- public abstract Optional<? extends Throwable> getException();
+ public abstract Optional<? extends Exception> getException();
}
/**
@@ -344,10 +344,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
* return normally or throw an exception.
*/
private static class VisibleExecutorUpdate {
- private final Optional<? extends Throwable> throwable;
+ private final Optional<? extends Exception> exception;
private final boolean done;
- public static VisibleExecutorUpdate fromThrowable(Throwable e) {
+ public static VisibleExecutorUpdate fromException(Exception e) {
return new VisibleExecutorUpdate(false, e);
}
@@ -355,8 +355,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
return new VisibleExecutorUpdate(true, null);
}
- private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) {
- this.throwable = Optional.fromNullable(exception);
+ private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) {
+ this.exception = Optional.fromNullable(exception);
this.done = done;
}
@@ -410,7 +410,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
allUpdates.offer(update);
}
} else if (update.getException().isPresent()) {
- visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
+ visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get()));
exceptionThrown = true;
}
}
@@ -418,12 +418,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Monitor died due to being interrupted");
- while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
+ while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(e))) {
visibleUpdates.poll();
}
- } catch (Throwable t) {
- LOG.error("Monitor thread died due to throwable", t);
- while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
+ } catch (Exception t) {
+ LOG.error("Monitor thread died due to exception", t);
+ while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(t))) {
visibleUpdates.poll();
}
} finally {
@@ -478,7 +478,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
try {
registry.cleanup();
} catch (Exception e) {
- visibleUpdates.add(VisibleExecutorUpdate.fromThrowable(e));
+ visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
}
if (evaluationContext.isDone()) {
while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
index 01a5c54..f900a22 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -43,5 +43,5 @@ interface PipelineExecutor {
* @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
* waiting thread and rethrows it
*/
- void awaitCompletion() throws Throwable;
+ void awaitCompletion() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 03f615b..c4002b5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -121,12 +121,12 @@ class TransformExecutor<T> implements Runnable {
processElements(evaluator, metricsContainer, enforcements);
finishBundle(evaluator, metricsContainer, enforcements);
- } catch (Throwable t) {
- onComplete.handleThrowable(inputBundle, t);
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
+ } catch (Exception e) {
+ onComplete.handleException(inputBundle, e);
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
}
- throw new RuntimeException(t);
+ throw new RuntimeException(e);
} finally {
// Report the physical metrics from the end of this step.
context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 5015e5a..32f874d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -130,7 +130,7 @@ public class TransformExecutorTest {
assertThat(finishCalled.get(), is(true));
assertThat(completionCallback.handledResult, equalTo(result));
- assertThat(completionCallback.handledThrowable, is(nullValue()));
+ assertThat(completionCallback.handledException, is(nullValue()));
}
@Test
@@ -150,7 +150,7 @@ public class TransformExecutorTest {
assertThat(completionCallback.handledResult, is(nullValue()));
assertThat(completionCallback.handledEmpty, equalTo(true));
- assertThat(completionCallback.handledThrowable, is(nullValue()));
+ assertThat(completionCallback.handledException, is(nullValue()));
}
@Test
@@ -196,7 +196,7 @@ public class TransformExecutorTest {
assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
assertThat(completionCallback.handledResult, equalTo(result));
- assertThat(completionCallback.handledThrowable, is(nullValue()));
+ assertThat(completionCallback.handledException, is(nullValue()));
}
@Test
@@ -237,7 +237,7 @@ public class TransformExecutorTest {
evaluatorCompleted.await();
assertThat(completionCallback.handledResult, is(nullValue()));
- assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+ assertThat(completionCallback.handledException, Matchers.<Throwable>equalTo(exception));
}
@Test
@@ -273,7 +273,7 @@ public class TransformExecutorTest {
evaluatorCompleted.await();
assertThat(completionCallback.handledResult, is(nullValue()));
- assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+ assertThat(completionCallback.handledException, Matchers.<Throwable>equalTo(exception));
}
@Test
@@ -479,7 +479,7 @@ public class TransformExecutorTest {
private static class RegisteringCompletionCallback implements CompletionCallback {
private TransformResult handledResult = null;
private boolean handledEmpty = false;
- private Throwable handledThrowable = null;
+ private Exception handledException = null;
private final CountDownLatch onMethod;
private RegisteringCompletionCallback(CountDownLatch onMethod) {
@@ -512,8 +512,8 @@ public class TransformExecutorTest {
}
@Override
- public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
- handledThrowable = t;
+ public void handleException(CommittedBundle<?> inputBundle, Exception e) {
+ handledException = e;
onMethod.countDown();
}
}