You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/15 16:53:00 UTC
flink git commit: [FLINK-6552] Allow differing types for side outputs
Repository: flink
Updated Branches:
refs/heads/master 55010d0bd -> 4651a1690
[FLINK-6552] Allow differing types for side outputs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4651a169
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4651a169
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4651a169
Branch: refs/heads/master
Commit: 4651a1690ac8d5784071eae1fad8ce179385cdaa
Parents: 55010d0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri May 12 14:40:44 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 15 12:50:29 2017 -0400
----------------------------------------------------------------------
.../flink/streaming/api/graph/StreamGraph.java | 7 +--
.../streaming/runtime/SideOutputITCase.java | 55 ++++++++++++++++++--
.../runtime/util/TestListResultSink.java | 6 ++-
3 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 5dd651c..2784517 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -318,9 +318,10 @@ public class StreamGraph extends StreamingPlan {
continue;
}
- if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
- throw new IllegalArgumentException("Trying to add a side input for the same id " +
- "with a different type. This is not allowed.");
+ if (tag.f1.getId().equals(outputTag.getId()) &&
+ !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
+ throw new IllegalArgumentException("Trying to add a side output for the same" +
+ "side-output id with a different type. This is not allowed.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 27124cc..765eae5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -160,14 +160,25 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
env.execute();
assertEquals(
- Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE),
+ Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5",
+ "WM:0", "WM:0", "WM:0",
+ "WM:2", "WM:2", "WM:2" ,
+ "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
sideOutputResultSink1.getSortedResult());
assertEquals(
- Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE),
+ Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5",
+ "WM:0", "WM:0", "WM:0",
+ "WM:2", "WM:2", "WM:2" ,
+ "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
sideOutputResultSink1.getSortedResult());
- assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE), resultSink.getSortedResult());
+ assertEquals(
+ Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5",
+ "WM:0", "WM:0", "WM:0",
+ "WM:2", "WM:2", "WM:2" ,
+ "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
+ resultSink.getSortedResult());
}
@Test
@@ -242,6 +253,44 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
}
@Test
+ public void testDifferentSideOutputTypes() throws Exception {
+ final OutputTag<String> sideOutputTag1 = new OutputTag<String>("string"){};
+ final OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("int"){};
+
+ TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+ TestListResultSink<Integer> sideOutputResultSink2 = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ env.setParallelism(3);
+
+ DataStream<Integer> dataStream = env.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+ .process(new ProcessFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value));
+ ctx.output(sideOutputTag2, 13);
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+ passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+ passThroughtStream.addSink(resultSink);
+ env.execute();
+
+ assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult());
+ assertEquals(Arrays.asList(13, 13, 13, 13, 13), sideOutputResultSink2.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+ @Test
public void testSideOutputNameClash() throws Exception {
final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side"){};
final OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("side"){};
http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
index 321d4c5..3fabb4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
@@ -17,6 +17,8 @@
package org.apache.flink.test.streaming.runtime.util;
+import java.util.Collections;
+import java.util.Comparator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -66,8 +68,8 @@ public class TestListResultSink<T> extends RichSinkFunction<T> {
public List<T> getSortedResult() {
synchronized (resultList()) {
- TreeSet<T> treeSet = new TreeSet<T>(resultList());
- ArrayList<T> sortedList = new ArrayList<T>(treeSet);
+ ArrayList<T> sortedList = new ArrayList<T>(resultList());
+ Collections.sort((List) sortedList);
return sortedList;
}
}