You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:36 UTC

[18/50] incubator-beam git commit: Add setupDelegatingAggregators for DoFn (for now)

Add setupDelegatingAggregators for DoFn (for now)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2e751f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2e751f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2e751f4

Branch: refs/heads/apex-runner
Commit: c2e751f49d72968f2478931cdb884fd4af173610
Parents: 08dd149
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:53:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |  1 +
 .../org/apache/beam/sdk/transforms/DoFn.java    | 24 ++++++++++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0360bc2..1cf56a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -228,6 +228,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       this.stepContext = stepContext;
       this.aggregatorFactory = aggregatorFactory;
       this.windowFn = windowFn;
+      super.setupDelegateAggregators();
     }
 
     //////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0531cbb..11ca853 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -214,6 +214,30 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     protected abstract <AggInputT, AggOutputT>
         Aggregator<AggInputT, AggOutputT> createAggregator(
             String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+    /**
+     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+     *
+     * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected final void setupDelegateAggregators() {
+      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+        setupDelegateAggregator(aggregator);
+      }
+
+      aggregatorsAreFinal = true;
+    }
+
+    private <AggInputT, AggOutputT> void setupDelegateAggregator(
+        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+      Aggregator<AggInputT, AggOutputT> delegate = createAggregator(
+          aggregator.getName(), aggregator.getCombineFn());
+
+      aggregator.setDelegate(delegate);
+    }
+
   }
 
   /**