You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/05 19:54:33 UTC

[1/2] incubator-beam git commit: Refactor CompletionCallbacks

Repository: incubator-beam
Updated Branches:
  refs/heads/master e63311fa4 -> 51e1e59b8


Refactor CompletionCallbacks

The default and timerful completion callbacks are identical, excepting
their calls to evaluationContext.commitResult; factor that code into a
common location.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2adf45f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2adf45f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2adf45f3

Branch: refs/heads/master
Commit: 2adf45f31a0253be0ab3f3cc74b65e0aee584e37
Parents: 9b9d73f
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:22:13 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 3 14:33:38 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 51 ++++++++++++--------
 1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2adf45f3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 18af363..9f26e5a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   }
 
   /**
-   * The default {@link CompletionCallback}. The default completion callback is used to complete
-   * transform evaluations that are triggered due to the arrival of elements from an upstream
-   * transform, or for a source transform.
+   * The base implementation of {@link CompletionCallback} that provides implementations for
+   * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and
+   * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of
+   * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}.
    */
-  private class DefaultCompletionCallback implements CompletionCallback {
+  private abstract class CompletionCallbackBase implements CompletionCallback {
+    protected abstract CommittedResult getCommittedResult(
+        CommittedBundle<?> inputBundle,
+        InProcessTransformResult result);
+
     @Override
-    public CommittedResult handleResult(
+    public final CommittedResult handleResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
+      CommittedResult committedResult = getCommittedResult(inputBundle, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
       }
@@ -227,18 +231,33 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     }
 
     @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+    public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
       allUpdates.offer(ExecutorUpdate.fromThrowable(t));
     }
   }
 
   /**
+   * The default {@link CompletionCallback}. The default completion callback is used to complete
+   * transform evaluations that are triggered due to the arrival of elements from an upstream
+   * transform, or for a source transform.
+   */
+  private class DefaultCompletionCallback extends CompletionCallbackBase {
+    @Override
+    public CommittedResult getCommittedResult(
+        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      return evaluationContext.handleResult(inputBundle,
+          Collections.<TimerData>emptyList(),
+          result);
+    }
+  }
+
+  /**
    * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
    * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
    * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
    * as part of the result.
    */
-  private class TimerCompletionCallback implements CompletionCallback {
+  private class TimerCompletionCallback extends CompletionCallbackBase {
     private final Iterable<TimerData> timers;
 
     private TimerCompletionCallback(Iterable<TimerData> timers) {
@@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     }
 
     @Override
-    public CommittedResult handleResult(
+    public CommittedResult getCommittedResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, timers, result);
-      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-      return committedResult;
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+          return evaluationContext.handleResult(inputBundle, timers, result);
     }
   }
 


[2/2] incubator-beam git commit: This closes #282

Posted by ke...@apache.org.
This closes #282


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51e1e59b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51e1e59b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51e1e59b

Branch: refs/heads/master
Commit: 51e1e59b8988d7caf3b924378e2ff95037fca4d3
Parents: e63311f 2adf45f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 5 12:54:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 5 12:54:08 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 51 ++++++++++++--------
 1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------