You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/12/08 01:56:10 UTC

[1/2] incubator-beam git commit: [BEAM-1096] Flink streaming side output optimization using SplitStream

Repository: incubator-beam
Updated Branches:
  refs/heads/master c53e0b162 -> 6807480a9


[BEAM-1096] Flink streaming side output optimization using SplitStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1a5704a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1a5704a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1a5704a

Branch: refs/heads/master
Commit: f1a5704a505b01d7d4649b61d1f6697859367964
Parents: c53e0b1
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Dec 7 09:48:35 2016 +0400
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:55:22 2016 +0800

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1a5704a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 47935eb..7b32c76 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -78,11 +78,13 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -554,6 +556,14 @@ public class FlinkStreamingTransformTranslators {
             .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
       }
 
+      SplitStream<RawUnionValue> splitStream = unionOutputStream
+              .split(new OutputSelector<RawUnionValue>() {
+                @Override
+                public Iterable<String> select(RawUnionValue value) {
+                  return Collections.singletonList(Integer.toString(value.getUnionTag()));
+                }
+              });
+
       for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
         final int outputTag = tagsToLabels.get(output.getKey());
 
@@ -561,17 +571,15 @@ public class FlinkStreamingTransformTranslators {
             context.getTypeInfo(output.getValue());
 
         @SuppressWarnings("unchecked")
-        DataStream filtered =
-            unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
-              @Override
-              public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
-                if (value.getUnionTag() == outputTag) {
-                  out.collect(value.getValue());
-                }
-              }
-            }).returns(outputTypeInfo);
+        DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
+          .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+            @Override
+            public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+              out.collect(value.getValue());
+            }
+          }).returns(outputTypeInfo);
 
-        context.setOutputDataStream(output.getValue(), filtered);
+        context.setOutputDataStream(output.getValue(), unwrapped);
       }
     }
 


[2/2] incubator-beam git commit: This closes #1520

Posted by al...@apache.org.
This closes #1520


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6807480a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6807480a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6807480a

Branch: refs/heads/master
Commit: 6807480a97f2315b3f48ad8dd5accb4e30475fa4
Parents: c53e0b1 f1a5704
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Dec 8 09:55:32 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:55:32 2016 +0800

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------