You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/04/12 03:26:38 UTC

[GitHub] [flink] yanghua commented on a change in pull request #7470: [FLINK-11283] Accessing the key when processing connected keyed stream

yanghua commented on a change in pull request #7470: [FLINK-11283] Accessing the key when processing connected keyed stream
URL: https://github.com/apache/flink/pull/7470#discussion_r274748632
 
 

 ##########
 File path: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 ##########
 @@ -591,6 +638,114 @@ public void processElement2(Integer value, Context ctx, Collector<Integer> out)
 		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
 	}
 
+	/**
+	 * Test keyed KeyedCoProcessFunction side output with multiple consumers.
+	 */
+	@Test
+	public void testRealKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
+		final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
+		final OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
+
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> ds1 = see.fromCollection(elements);
+		DataStream<Integer> ds2 = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+			.keyBy(i -> i)
+			.connect(ds2.keyBy(i -> i))
+			.process(new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() {
+				@Override
+				public void processElement1(Integer value, Context ctx, Collector<Integer> out)
+					throws Exception {
+					if (value < 4) {
+						out.collect(value);
+						ctx.output(sideOutputTag1, "sideout1-" + ctx.getCurrentKey() + "-" + String.valueOf(value));
+					}
+				}
+
+				@Override
+				public void processElement2(Integer value, Context ctx, Collector<Integer> out)
+					throws Exception {
+					if (value >= 4) {
+						out.collect(value);
+						ctx.output(sideOutputTag2, "sideout2-" + ctx.getCurrentKey() + "-" + String.valueOf(value));
+					}
+				}
+			});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2", "sideout1-3-3"), sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList("sideout2-4-4", "sideout2-5-5"), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
+	 * Test keyed KeyedCoProcessFunction side output with multiple consumers.
+	 */
+	@Test
+	public void testRealKeyedCoProcessFunctionSideOutputWithMultipleConsumersAndDifferentTypes() throws Exception {
 
 Review comment:
   um... Yes, I tried tested a tuple input type and whether it can invoke `keyBy`, `connect` or not.
   OK, Let me remove this test.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services