You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/24 12:10:31 UTC

flink git commit: [FLINK-7864] Support side-outputs in CoProcessFunction

Repository: flink
Updated Branches:
  refs/heads/master a292b2182 -> 35ad5396c


[FLINK-7864] Support side-outputs in CoProcessFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35ad5396
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35ad5396
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35ad5396

Branch: refs/heads/master
Commit: 35ad5396caa533735070aa514134f08c3e3ecee1
Parents: a292b21
Author: Bowen Li <bo...@gmail.com>
Authored: Sun Oct 22 16:57:43 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 24 14:01:20 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/side_output.md                  |   1 +
 .../api/functions/co/CoProcessFunction.java     |   9 +
 .../api/operators/KeyedProcessOperator.java     |   4 +
 .../api/operators/co/CoProcessOperator.java     |  10 +
 .../operators/co/KeyedCoProcessOperator.java    |  24 ++-
 .../streaming/runtime/SideOutputITCase.java     | 187 +++++++++++++++++++
 6 files changed, 232 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/docs/dev/stream/side_output.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/side_output.md b/docs/dev/stream/side_output.md
index 39ead7e..c694e47 100644
--- a/docs/dev/stream/side_output.md
+++ b/docs/dev/stream/side_output.md
@@ -58,6 +58,7 @@ contains.
 Emitting data to a side output is possible from the following functions:
 
 - [ProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html)
+- CoProcessFunction
 - [ProcessWindowFunction]({{ site.baseurl }}/dev/windows.html#processwindowfunction)
 - ProcessAllWindowFunction
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index 89a52d2..20c1084 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * A function that processes elements of two streams and produces a single output one.
@@ -116,6 +117,14 @@ public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunct
 		 * A {@link TimerService} for querying time and registering timers.
 		 */
 		public abstract TimerService timerService();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 *
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 5537b5e..7d7ee86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -158,6 +158,10 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 		@Override
 		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+
 			output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index 63ddb3f..332b92b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -131,5 +132,14 @@ public class CoProcessOperator<IN1, IN2, OUT>
 		public TimerService timerService() {
 			return this;
 		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+
+			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
index d125a79..d53e6e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -110,7 +111,7 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
 		return collector;
 	}
 
-	private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
+	private class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
 
 		private final TimerService timerService;
 
@@ -136,10 +137,18 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
 		public TimerService timerService() {
 			return timerService;
 		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+
+			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+		}
 	}
 
-	private static class OnTimerContextImpl<IN1, IN2, OUT>
-			extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
+	private class OnTimerContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
 
 		private final TimerService timerService;
 
@@ -168,5 +177,14 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
 		public TimerService timerService() {
 			return timerService;
 		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+
+			output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 7f3fe8b..1b07dbd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
@@ -362,6 +363,98 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 	}
 
 	/**
+	 * Test CoProcessFunction side output.
+	 */
+	@Test
+	public void testCoProcessFunctionSideOutput() throws Exception {
+		final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> ds1 = see.fromCollection(elements);
+		DataStream<Integer> ds2 = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+				.connect(ds2)
+				.process(new CoProcessFunction<Integer, Integer, Integer>() {
+					@Override
+					public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value < 3) {
+							out.collect(value);
+							ctx.output(sideOutputTag, "sideout1-" + String.valueOf(value));
+						}
+					}
+
+					@Override
+					public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value >= 3) {
+							out.collect(value);
+							ctx.output(sideOutputTag, "sideout2-" + String.valueOf(value));
+						}
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), sideOutputResultSink.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
+	 * Test CoProcessFunction side output with multiple consumers.
+	 */
+	@Test
+	public void testCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
+		final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
+		final OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
+
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> ds1 = see.fromCollection(elements);
+		DataStream<Integer> ds2 = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+				.connect(ds2)
+				.process(new CoProcessFunction<Integer, Integer, Integer>() {
+					@Override
+					public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value < 4) {
+							out.collect(value);
+							ctx.output(sideOutputTag1, "sideout1-" + String.valueOf(value));
+						}
+					}
+
+					@Override
+					public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value >= 4) {
+							out.collect(value);
+							ctx.output(sideOutputTag2, "sideout2-" + String.valueOf(value));
+						}
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
 	 * Test keyed ProcessFunction side output.
 	 */
 	@Test
@@ -405,6 +498,100 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 	}
 
 	/**
+	 * Test keyed CoProcessFunction side output.
+	 */
+	@Test
+	public void testKeyedCoProcessFunctionSideOutput() throws Exception {
+		final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> ds1 = see.fromCollection(elements);
+		DataStream<Integer> ds2 = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+				.keyBy(i -> i)
+				.connect(ds2.keyBy(i -> i))
+				.process(new CoProcessFunction<Integer, Integer, Integer>() {
+					@Override
+					public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value < 3) {
+							out.collect(value);
+							ctx.output(sideOutputTag, "sideout1-" + String.valueOf(value));
+						}
+					}
+
+					@Override
+					public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value >= 3) {
+							out.collect(value);
+							ctx.output(sideOutputTag, "sideout2-" + String.valueOf(value));
+						}
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), sideOutputResultSink.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
+	 * Test keyed CoProcessFunction side output with multiple consumers.
+	 */
+	@Test
+	public void testKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
+		final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
+		final OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
+
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> ds1 = see.fromCollection(elements);
+		DataStream<Integer> ds2 = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+				.keyBy(i -> i)
+				.connect(ds2.keyBy(i -> i))
+				.process(new CoProcessFunction<Integer, Integer, Integer>() {
+					@Override
+					public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value < 4) {
+							out.collect(value);
+							ctx.output(sideOutputTag1, "sideout1-" + String.valueOf(value));
+						}
+					}
+
+					@Override
+					public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						if (value >= 4) {
+							out.collect(value);
+							ctx.output(sideOutputTag2, "sideout2-" + String.valueOf(value));
+						}
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
 	 * Test ProcessFunction side outputs with wrong {@code OutputTag}.
 	 */
 	@Test