You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/05/26 12:41:59 UTC

[1/2] beam git commit: fix FlinkAccumulatorCombiningStateWithContext read null accum bug

Repository: beam
Updated Branches:
  refs/heads/master deee5b3c2 -> 7568f0298


fix FlinkAccumulatorCombiningStateWithContext read null accum bug


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de38410d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de38410d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de38410d

Branch: refs/heads/master
Commit: de38410d3e2cf9c6edff9438d539929777ad7915
Parents: deee5b3
Author: 波特 <ha...@alibaba-inc.com>
Authored: Thu Apr 20 20:27:31 2017 +0800
Committer: Pei He <he...@alibaba-inc.com>
Committed: Fri May 26 20:40:59 2017 +0800

----------------------------------------------------------------------
 .../wrappers/streaming/state/FlinkStateInternals.java          | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/de38410d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 9cb742e..b73abe9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -821,7 +821,11 @@ public class FlinkStateInternals<K> implements StateInternals {
                 flinkStateDescriptor);
 
         AccumT accum = state.value();
-        return combineFn.extractOutput(accum, context);
+        if (accum != null) {
+          return combineFn.extractOutput(accum, context);
+        } else {
+          return combineFn.extractOutput(combineFn.createAccumulator(context), context);
+        }
       } catch (Exception e) {
         throw new RuntimeException("Error reading state.", e);
       }


[2/2] beam git commit: This closes #3234

Posted by pe...@apache.org.
This closes #3234


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7568f029
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7568f029
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7568f029

Branch: refs/heads/master
Commit: 7568f02986db9a31e38f8fa57a1137121a086b2d
Parents: deee5b3 de38410
Author: Pei He <he...@alibaba-inc.com>
Authored: Fri May 26 20:41:38 2017 +0800
Committer: Pei He <he...@alibaba-inc.com>
Committed: Fri May 26 20:41:38 2017 +0800

----------------------------------------------------------------------
 .../wrappers/streaming/state/FlinkStateInternals.java          | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------