You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:49 UTC

[10/11] flink git commit: [FLINK-4460] Add side output ITCase for multiple side out consumers

[FLINK-4460] Add side output ITCase for multiple side out consumers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/709edb80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/709edb80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/709edb80

Branch: refs/heads/master
Commit: 709edb80b8fca2d669f960b7daf983933a8a33c5
Parents: c3fc8b9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 17:59:36 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../streaming/runtime/SideOutputITCase.java     | 67 ++++++++++++++++++++
 1 file changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/709edb80/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index b52c1f1..44ba576 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -165,6 +165,73 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 		assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE), resultSink.getSortedResult());
 	}
 
+	@Test
+	public void testSideOutputWithMultipleConsumers() throws Exception {
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
+
+		DataStream<Integer> dataStream = env.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value));
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		env.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	@Test
+	public void testSideOutputWithMultipleConsumersWithObjectReuse() throws Exception {
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.setParallelism(3);
+
+		DataStream<Integer> dataStream = env.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value));
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		env.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
 	/**
 	 * Test ProcessFunction side output.
 	 */