You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:36 UTC
[18/50] incubator-beam git commit: Add setupDelegatingAggregators for
DoFn (for now)
Add setupDelegatingAggregators for DoFn (for now)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2e751f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2e751f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2e751f4
Branch: refs/heads/apex-runner
Commit: c2e751f49d72968f2478931cdb884fd4af173610
Parents: 08dd149
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:53:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 1 +
.../org/apache/beam/sdk/transforms/DoFn.java | 24 ++++++++++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0360bc2..1cf56a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -228,6 +228,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
this.stepContext = stepContext;
this.aggregatorFactory = aggregatorFactory;
this.windowFn = windowFn;
+ super.setupDelegateAggregators();
}
//////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0531cbb..11ca853 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -214,6 +214,30 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
protected abstract <AggInputT, AggOutputT>
Aggregator<AggInputT, AggOutputT> createAggregator(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+ /**
+ * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+ *
+ * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+ */
+ @Experimental(Kind.AGGREGATOR)
+ protected final void setupDelegateAggregators() {
+ for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+ setupDelegateAggregator(aggregator);
+ }
+
+ aggregatorsAreFinal = true;
+ }
+
+ private <AggInputT, AggOutputT> void setupDelegateAggregator(
+ DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+ Aggregator<AggInputT, AggOutputT> delegate = createAggregator(
+ aggregator.getName(), aggregator.getCombineFn());
+
+ aggregator.setDelegate(delegate);
+ }
+
}
/**