You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/05 19:54:33 UTC
[1/2] incubator-beam git commit: Refactor CompletionCallbacks
Repository: incubator-beam
Updated Branches:
refs/heads/master e63311fa4 -> 51e1e59b8
Refactor CompletionCallbacks
The default and timerful completion callbacks are identical, excepting
their calls to evaluationContext.commitResult; factor that code into a
common location.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2adf45f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2adf45f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2adf45f3
Branch: refs/heads/master
Commit: 2adf45f31a0253be0ab3f3cc74b65e0aee584e37
Parents: 9b9d73f
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:22:13 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 3 14:33:38 2016 -0700
----------------------------------------------------------------------
.../direct/ExecutorServiceParallelExecutor.java | 51 ++++++++++++--------
1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2adf45f3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 18af363..9f26e5a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
/**
- * The default {@link CompletionCallback}. The default completion callback is used to complete
- * transform evaluations that are triggered due to the arrival of elements from an upstream
- * transform, or for a source transform.
+ * The base implementation of {@link CompletionCallback} that provides implementations for
+ * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and
+ * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of
+ * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}.
*/
- private class DefaultCompletionCallback implements CompletionCallback {
+ private abstract class CompletionCallbackBase implements CompletionCallback {
+ protected abstract CommittedResult getCommittedResult(
+ CommittedBundle<?> inputBundle,
+ InProcessTransformResult result);
+
@Override
- public CommittedResult handleResult(
+ public final CommittedResult handleResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
- CommittedResult committedResult =
- evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
+ CommittedResult committedResult = getCommittedResult(inputBundle, result);
for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
}
@@ -227,18 +231,33 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
@Override
- public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+ public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
allUpdates.offer(ExecutorUpdate.fromThrowable(t));
}
}
/**
+ * The default {@link CompletionCallback}. The default completion callback is used to complete
+ * transform evaluations that are triggered due to the arrival of elements from an upstream
+ * transform, or for a source transform.
+ */
+ private class DefaultCompletionCallback extends CompletionCallbackBase {
+ @Override
+ public CommittedResult getCommittedResult(
+ CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+ return evaluationContext.handleResult(inputBundle,
+ Collections.<TimerData>emptyList(),
+ result);
+ }
+ }
+
+ /**
* A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
* of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
* timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
* as part of the result.
*/
- private class TimerCompletionCallback implements CompletionCallback {
+ private class TimerCompletionCallback extends CompletionCallbackBase {
private final Iterable<TimerData> timers;
private TimerCompletionCallback(Iterable<TimerData> timers) {
@@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
@Override
- public CommittedResult handleResult(
+ public CommittedResult getCommittedResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
- CommittedResult committedResult =
- evaluationContext.handleResult(inputBundle, timers, result);
- for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
- allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
- }
- return committedResult;
- }
-
- @Override
- public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
- allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+ return evaluationContext.handleResult(inputBundle, timers, result);
}
}
[2/2] incubator-beam git commit: This closes #282
Posted by ke...@apache.org.
This closes #282
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51e1e59b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51e1e59b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51e1e59b
Branch: refs/heads/master
Commit: 51e1e59b8988d7caf3b924378e2ff95037fca4d3
Parents: e63311f 2adf45f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 5 12:54:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 5 12:54:08 2016 -0700
----------------------------------------------------------------------
.../direct/ExecutorServiceParallelExecutor.java | 51 ++++++++++++--------
1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------