You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/12/08 01:56:10 UTC
[1/2] incubator-beam git commit: [BEAM-1096] Flink streaming side
output optimization using SplitStream
Repository: incubator-beam
Updated Branches:
refs/heads/master c53e0b162 -> 6807480a9
[BEAM-1096] Flink streaming side output optimization using SplitStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1a5704a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1a5704a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1a5704a
Branch: refs/heads/master
Commit: f1a5704a505b01d7d4649b61d1f6697859367964
Parents: c53e0b1
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Dec 7 09:48:35 2016 +0400
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:55:22 2016 +0800
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 28 +++++++++++++-------
1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1a5704a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 47935eb..7b32c76 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -78,11 +78,13 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -554,6 +556,14 @@ public class FlinkStreamingTransformTranslators {
.transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
}
+ SplitStream<RawUnionValue> splitStream = unionOutputStream
+ .split(new OutputSelector<RawUnionValue>() {
+ @Override
+ public Iterable<String> select(RawUnionValue value) {
+ return Collections.singletonList(Integer.toString(value.getUnionTag()));
+ }
+ });
+
for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
final int outputTag = tagsToLabels.get(output.getKey());
@@ -561,17 +571,15 @@ public class FlinkStreamingTransformTranslators {
context.getTypeInfo(output.getValue());
@SuppressWarnings("unchecked")
- DataStream filtered =
- unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
- @Override
- public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
- if (value.getUnionTag() == outputTag) {
- out.collect(value.getValue());
- }
- }
- }).returns(outputTypeInfo);
+ DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
+ .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+ @Override
+ public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+ out.collect(value.getValue());
+ }
+ }).returns(outputTypeInfo);
- context.setOutputDataStream(output.getValue(), filtered);
+ context.setOutputDataStream(output.getValue(), unwrapped);
}
}
[2/2] incubator-beam git commit: This closes #1520
Posted by al...@apache.org.
This closes #1520
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6807480a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6807480a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6807480a
Branch: refs/heads/master
Commit: 6807480a97f2315b3f48ad8dd5accb4e30475fa4
Parents: c53e0b1 f1a5704
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Dec 8 09:55:32 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:55:32 2016 +0800
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 28 +++++++++++++-------
1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------