You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:22 UTC

[04/50] incubator-beam git commit: Simplify DoFnRunners to be somewhat compositional

Simplify DoFnRunners to be somewhat compositional


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f043865e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f043865e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f043865e

Branch: refs/heads/apex-runner
Commit: f043865ea9d842b85e21687c4503067a530960e6
Parents: 2d1df8b
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 10 14:30:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   | 67 +++++++++-----------
 1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f043865e/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 7726374..41eb9d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,31 +81,15 @@ public class DoFnRunners {
    */
   public static <K, InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
-          PipelineOptions options,
-          ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
-          SideInputReader sideInputReader,
-          OutputManager outputManager,
-          TupleTag<KV<K, OutputT>> mainOutputTag,
-          List<TupleTag<?>> sideOutputTags,
+          DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner,
           StepContext stepContext,
-          AggregatorFactory aggregatorFactory,
-          WindowingStrategy<?, W> windowingStrategy) {
-    DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
-        simpleRunner(
-            options,
-            reduceFnExecutor.asDoFn(),
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            sideOutputTags,
-            stepContext,
-            aggregatorFactory,
-            windowingStrategy);
+          WindowingStrategy<?, W> windowingStrategy,
+          Aggregator<Long, Long> droppedDueToLatenessAggregator) {
     return new LateDataDroppingDoFnRunner<>(
-        simpleDoFnRunner,
+        wrappedRunner,
         windowingStrategy,
         stepContext.timerInternals(),
-        reduceFnExecutor.getDroppedDueToLatenessAggregator());
+        droppedDueToLatenessAggregator);
   }
 
 
@@ -118,23 +103,8 @@ public class DoFnRunners {
       StepContext stepContext,
       AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
-    if (doFn instanceof ReduceFnExecutor) {
-      @SuppressWarnings("rawtypes")
-      ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
-      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
-      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
-          options,
-          fn,
-          sideInputReader,
-          outputManager,
-          (TupleTag) mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          aggregatorFactory,
-          (WindowingStrategy) windowingStrategy);
-      return runner;
-    }
-    return simpleRunner(
+
+    DoFnRunner<InputT, OutputT> doFnRunner = simpleRunner(
         options,
         doFn,
         sideInputReader,
@@ -144,5 +114,26 @@ public class DoFnRunners {
         stepContext,
         aggregatorFactory,
         windowingStrategy);
+
+    if (!(doFn instanceof ReduceFnExecutor)) {
+      return doFnRunner;
+    } else {
+      // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped
+      // elements and we also learn that for some K and V,
+      //   InputT = KeyedWorkItem<K, V>
+      //   OutputT = KV<K, V>
+
+      Aggregator<Long, Long> droppedDueToLatenessAggregator =
+          ((ReduceFnExecutor<?, ?, ?, ?>) doFn).getDroppedDueToLatenessAggregator();
+
+      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
+          (DoFnRunner) doFnRunner,
+          stepContext,
+          (WindowingStrategy) windowingStrategy,
+          droppedDueToLatenessAggregator);
+
+      return runner;
+    }
   }
 }