You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:37 UTC

[5/6] beam git commit: [BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals

[BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c44935e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c44935e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c44935e

Branch: refs/heads/master
Commit: 7c44935e1c47cce2ecfe842e37c2cf89f48d8583
Parents: 5555040
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Mar 18 15:21:45 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../translation/wrappers/streaming/state/FlinkStateInternals.java  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7c44935e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c033be6..cea6e0f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -592,6 +592,8 @@ public class FlinkStateInternals<K> implements StateInternals {
         }
         current = combineFn.addInput(current, value);
         state.update(current);
+      } catch (RuntimeException re) {
+        throw re;
       } catch (Exception e) {
         throw new RuntimeException("Error adding to state." , e);
       }