You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:43:44 UTC
[06/50] [abbrv] incubator-beam git commit: Make aggregator
registration idempotent in FlinkRunner
Make aggregator registration idempotent in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2089c5cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2089c5cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2089c5cd
Branch: refs/heads/gearpump-runner
Commit: 2089c5cd2662a2eeea39ac7ebd1bfd8bcdc1aa16
Parents: 1919d8b
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 21:26:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:26:48 2016 -0700
----------------------------------------------------------------------
.../flink/translation/functions/FlinkProcessContext.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2089c5cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index fa5eb1a..baf97cb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Iterables;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
@@ -316,7 +318,13 @@ class FlinkProcessContext<InputT, OutputT>
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
new SerializableFnAggregatorWrapper<>(combiner);
- runtimeContext.addAccumulator(name, wrapper);
+ Accumulator<?, ?> existingAccum =
+ (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
+ if (existingAccum != null) {
+ return wrapper;
+ } else {
+ runtimeContext.addAccumulator(name, wrapper);
+ }
return wrapper;
}
}