You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/12/07 07:29:33 UTC
[1/2] incubator-beam git commit: [FLINK-1102] Fix Aggregator
Registration in Flink Batch Runner
Repository: incubator-beam
Updated Branches:
refs/heads/master 0a2ed832c -> b41a46e86
[FLINK-1102] Fix Aggregator Registration in Flink Batch Runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/869b2710
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/869b2710
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/869b2710
Branch: refs/heads/master
Commit: 869b2710efdb90bc3ce5b6e9d4f3b49a3a804a63
Parents: 0a2ed83
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Dec 7 13:28:13 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 7 15:25:44 2016 +0800
----------------------------------------------------------------------
.../functions/FlinkProcessContextBase.java | 21 +++++++++-----------
1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/869b2710/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 42607dd..6afca38 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink.translation.functions;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Iterables;
-import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -39,7 +38,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.joda.time.Instant;
@@ -256,15 +254,14 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
@Override
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
- SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
- new SerializableFnAggregatorWrapper<>(combiner);
- Accumulator<?, ?> existingAccum =
- (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
- if (existingAccum != null) {
- return wrapper;
- } else {
- runtimeContext.addAccumulator(name, wrapper);
+ @SuppressWarnings("unchecked")
+ SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =
+ (SerializableFnAggregatorWrapper<AggInputT, AggOutputT>)
+ runtimeContext.getAccumulator(name);
+
+ if (result == null) {
+ result = new SerializableFnAggregatorWrapper<>(combiner);
+ runtimeContext.addAccumulator(name, result);
}
- return wrapper;
- }
+ return result; }
}
[2/2] incubator-beam git commit: This closes #1530
Posted by al...@apache.org.
This closes #1530
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b41a46e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b41a46e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b41a46e8
Branch: refs/heads/master
Commit: b41a46e86fd38c4a887f31bdf6cb75969f4750d3
Parents: 0a2ed83 869b271
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Dec 7 15:26:02 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 7 15:26:02 2016 +0800
----------------------------------------------------------------------
.../functions/FlinkProcessContextBase.java | 21 +++++++++-----------
1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------