You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/12/07 07:29:33 UTC

[1/2] incubator-beam git commit: [FLINK-1102] Fix Aggregator Registration in Flink Batch Runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0a2ed832c -> b41a46e86


[FLINK-1102] Fix Aggregator Registration in Flink Batch Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/869b2710
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/869b2710
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/869b2710

Branch: refs/heads/master
Commit: 869b2710efdb90bc3ce5b6e9d4f3b49a3a804a63
Parents: 0a2ed83
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Dec 7 13:28:13 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 7 15:25:44 2016 +0800

----------------------------------------------------------------------
 .../functions/FlinkProcessContextBase.java      | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/869b2710/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 42607dd..6afca38 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink.translation.functions;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -39,7 +38,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.joda.time.Instant;
 
@@ -256,15 +254,14 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
   @Override
   protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
-        new SerializableFnAggregatorWrapper<>(combiner);
-    Accumulator<?, ?> existingAccum =
-        (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
-    if (existingAccum != null) {
-      return wrapper;
-    } else {
-      runtimeContext.addAccumulator(name, wrapper);
+    @SuppressWarnings("unchecked")
+    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =
+        (SerializableFnAggregatorWrapper<AggInputT, AggOutputT>)
+            runtimeContext.getAccumulator(name);
+
+    if (result == null) {
+      result = new SerializableFnAggregatorWrapper<>(combiner);
+      runtimeContext.addAccumulator(name, result);
     }
-    return wrapper;
-  }
+    return result;  }
 }


[2/2] incubator-beam git commit: This closes #1530

Posted by al...@apache.org.
This closes #1530


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b41a46e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b41a46e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b41a46e8

Branch: refs/heads/master
Commit: b41a46e86fd38c4a887f31bdf6cb75969f4750d3
Parents: 0a2ed83 869b271
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Dec 7 15:26:02 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 7 15:26:02 2016 +0800

----------------------------------------------------------------------
 .../functions/FlinkProcessContextBase.java      | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------