You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:11:07 UTC

[13/14] incubator-beam git commit: Make aggregator registration idempotent in FlinkRunner

Make aggregator registration idempotent in FlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2089c5cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2089c5cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2089c5cd

Branch: refs/heads/master
Commit: 2089c5cd2662a2eeea39ac7ebd1bfd8bcdc1aa16
Parents: 1919d8b
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 21:26:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:26:48 2016 -0700

----------------------------------------------------------------------
 .../flink/translation/functions/FlinkProcessContext.java  | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2089c5cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index fa5eb1a..baf97cb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
@@ -316,7 +318,13 @@ class FlinkProcessContext<InputT, OutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
     SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
         new SerializableFnAggregatorWrapper<>(combiner);
-    runtimeContext.addAccumulator(name, wrapper);
+    Accumulator<?, ?> existingAccum =
+        (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
+    if (existingAccum != null) {
+      return wrapper;
+    } else {
+      runtimeContext.addAccumulator(name, wrapper);
+    }
     return wrapper;
   }
 }