You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:34 UTC

[2/6] beam git commit: [BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner

[BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e2bb180
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e2bb180
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e2bb180

Branch: refs/heads/master
Commit: 0e2bb1808350cbebf771d0971deb06787732800d
Parents: 7c44935
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Mar 19 07:49:08 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 26 ++++++++++++++++++++
 1 file changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0e2bb180/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c024493..7339c01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -966,10 +966,36 @@ class FlinkStreamingTransformTranslators {
 
       } else {
         DataStream<T> result = null;
+
+        // Determine DataStreams that we use as input several times. For those, we need to uniquify
+        // input streams because Flink seems to swallow watermarks when we have a union of one and
+        // the same stream.
+        Map<DataStream<T>, Integer> duplicates = new HashMap<>();
+        for (PValue input : allInputs.values()) {
+          DataStream<T> current = context.getInputDataStream(input);
+          Integer oldValue = duplicates.put(current, 1);
+          if (oldValue != null) {
+            duplicates.put(current, oldValue + 1);
+          }
+        }
+
         for (PValue input : allInputs.values()) {
           DataStream<T> current = context.getInputDataStream(input);
+
+          final Integer timesRequired = duplicates.get(current);
+          if (timesRequired > 1) {
+            current = current.flatMap(new FlatMapFunction<T, T>() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public void flatMap(T t, Collector<T> collector) throws Exception {
+                collector.collect(t);
+              }
+            });
+          }
           result = (result == null) ? current : result.union(current);
         }
+
         context.setOutputDataStream(context.getOutput(transform), result);
       }
     }