You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/21 02:53:11 UTC
[3/7] incubator-beam git commit: Simplify DoFnRunners to be somewhat
compositional
Simplify DoFnRunners to be somewhat compositional
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f043865e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f043865e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f043865e
Branch: refs/heads/master
Commit: f043865ea9d842b85e21687c4503067a530960e6
Parents: 2d1df8b
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 10 14:30:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunners.java | 67 +++++++++-----------
1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f043865e/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 7726374..41eb9d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,31 +81,15 @@ public class DoFnRunners {
*/
public static <K, InputT, OutputT, W extends BoundedWindow>
DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
- PipelineOptions options,
- ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<KV<K, OutputT>> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
+ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
- WindowingStrategy<?, W> windowingStrategy) {
- DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
- simpleRunner(
- options,
- reduceFnExecutor.asDoFn(),
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- aggregatorFactory,
- windowingStrategy);
+ WindowingStrategy<?, W> windowingStrategy,
+ Aggregator<Long, Long> droppedDueToLatenessAggregator) {
return new LateDataDroppingDoFnRunner<>(
- simpleDoFnRunner,
+ wrappedRunner,
windowingStrategy,
stepContext.timerInternals(),
- reduceFnExecutor.getDroppedDueToLatenessAggregator());
+ droppedDueToLatenessAggregator);
}
@@ -118,23 +103,8 @@ public class DoFnRunners {
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
- if (doFn instanceof ReduceFnExecutor) {
- @SuppressWarnings("rawtypes")
- ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
- @SuppressWarnings({"unchecked", "cast", "rawtypes"})
- DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
- options,
- fn,
- sideInputReader,
- outputManager,
- (TupleTag) mainOutputTag,
- sideOutputTags,
- stepContext,
- aggregatorFactory,
- (WindowingStrategy) windowingStrategy);
- return runner;
- }
- return simpleRunner(
+
+ DoFnRunner<InputT, OutputT> doFnRunner = simpleRunner(
options,
doFn,
sideInputReader,
@@ -144,5 +114,26 @@ public class DoFnRunners {
stepContext,
aggregatorFactory,
windowingStrategy);
+
+ if (!(doFn instanceof ReduceFnExecutor)) {
+ return doFnRunner;
+ } else {
+ // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped
+ // elements and we also learn that for some K and V,
+ // InputT = KeyedWorkItem<K, V>
+ // OutputT = KV<K, V>
+
+ Aggregator<Long, Long> droppedDueToLatenessAggregator =
+ ((ReduceFnExecutor<?, ?, ?, ?>) doFn).getDroppedDueToLatenessAggregator();
+
+ @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+ DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
+ (DoFnRunner) doFnRunner,
+ stepContext,
+ (WindowingStrategy) windowingStrategy,
+ droppedDueToLatenessAggregator);
+
+ return runner;
+ }
}
}