You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:45 UTC

[27/50] incubator-beam git commit: Make DoFnTester aggregator initialization idempotent

Make DoFnTester aggregator initialization idempotent


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/043ebeca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/043ebeca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/043ebeca

Branch: refs/heads/apex-runner
Commit: 043ebecacf7a8e96939b025afa8480c6df2f3b41
Parents: 2ab955d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 13:35:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java    | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/043ebeca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 302bb02..7995719 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -543,6 +543,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator(
         final String name,
         final CombineFn<AinT, AccT, AoutT> combiner) {
+
       Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() {
         @Override
         public void addValue(AinT value) {
@@ -561,7 +562,22 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
           return combiner;
         }
       };
-      accumulators.put(name, combiner.createAccumulator());
+
+      // Aggregator instantiation is idempotent
+      if (accumulators.containsKey(name)) {
+        Class<?> currentAccumClass = accumulators.get(name).getClass();
+        Class<?> createAccumClass = combiner.createAccumulator().getClass();
+        checkState(
+            currentAccumClass.isAssignableFrom(createAccumClass),
+            "Aggregator %s already initialized with accumulator type %s "
+                + "but was re-initialized with accumulator type %s",
+            name,
+            currentAccumClass,
+            createAccumClass);
+
+      } else {
+        accumulators.put(name, combiner.createAccumulator());
+      }
       return aggregator;
     }