You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/24 12:10:31 UTC
flink git commit: [FLINK-7864] Support side-outputs in
CoProcessFunction
Repository: flink
Updated Branches:
refs/heads/master a292b2182 -> 35ad5396c
[FLINK-7864] Support side-outputs in CoProcessFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35ad5396
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35ad5396
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35ad5396
Branch: refs/heads/master
Commit: 35ad5396caa533735070aa514134f08c3e3ecee1
Parents: a292b21
Author: Bowen Li <bo...@gmail.com>
Authored: Sun Oct 22 16:57:43 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 24 14:01:20 2017 +0200
----------------------------------------------------------------------
docs/dev/stream/side_output.md | 1 +
.../api/functions/co/CoProcessFunction.java | 9 +
.../api/operators/KeyedProcessOperator.java | 4 +
.../api/operators/co/CoProcessOperator.java | 10 +
.../operators/co/KeyedCoProcessOperator.java | 24 ++-
.../streaming/runtime/SideOutputITCase.java | 187 +++++++++++++++++++
6 files changed, 232 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/docs/dev/stream/side_output.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/side_output.md b/docs/dev/stream/side_output.md
index 39ead7e..c694e47 100644
--- a/docs/dev/stream/side_output.md
+++ b/docs/dev/stream/side_output.md
@@ -58,6 +58,7 @@ contains.
Emitting data to a side output is possible from the following functions:
- [ProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html)
+- CoProcessFunction
- [ProcessWindowFunction]({{ site.baseurl }}/dev/windows.html#processwindowfunction)
- ProcessAllWindowFunction
http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index 89a52d2..20c1084 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* A function that processes elements of two streams and produces a single output one.
@@ -116,6 +117,14 @@ public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunct
* A {@link TimerService} for querying time and registering timers.
*/
public abstract TimerService timerService();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ *
+ * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+ * @param value The record to emit.
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 5537b5e..7d7ee86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -158,6 +158,10 @@ public class KeyedProcessOperator<K, IN, OUT>
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+
output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index 63ddb3f..332b92b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -131,5 +132,14 @@ public class CoProcessOperator<IN1, IN2, OUT>
public TimerService timerService() {
return this;
}
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
index d125a79..d53e6e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -110,7 +111,7 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
return collector;
}
- private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
+ private class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
private final TimerService timerService;
@@ -136,10 +137,18 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
public TimerService timerService() {
return timerService;
}
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+ }
}
- private static class OnTimerContextImpl<IN1, IN2, OUT>
- extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
+ private class OnTimerContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
private final TimerService timerService;
@@ -168,5 +177,14 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
public TimerService timerService() {
return timerService;
}
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35ad5396/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 7f3fe8b..1b07dbd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
@@ -362,6 +363,98 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
}
/**
+ * Test CoProcessFunction side output.
+ */
+ @Test
+ public void testCoProcessFunctionSideOutput() throws Exception {
+ final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> ds1 = see.fromCollection(elements);
+ DataStream<Integer> ds2 = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+ .connect(ds2)
+ .process(new CoProcessFunction<Integer, Integer, Integer>() {
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value < 3) {
+ out.collect(value);
+ ctx.output(sideOutputTag, "sideout1-" + String.valueOf(value));
+ }
+ }
+
+ @Override
+ public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value >= 3) {
+ out.collect(value);
+ ctx.output(sideOutputTag, "sideout2-" + String.valueOf(value));
+ }
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), sideOutputResultSink.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+ /**
+ * Test CoProcessFunction side output with multiple consumers.
+ */
+ @Test
+ public void testCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
+ final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
+ final OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
+
+ TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+ TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> ds1 = see.fromCollection(elements);
+ DataStream<Integer> ds2 = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+ .connect(ds2)
+ .process(new CoProcessFunction<Integer, Integer, Integer>() {
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value < 4) {
+ out.collect(value);
+ ctx.output(sideOutputTag1, "sideout1-" + String.valueOf(value));
+ }
+ }
+
+ @Override
+ public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value >= 4) {
+ out.collect(value);
+ ctx.output(sideOutputTag2, "sideout2-" + String.valueOf(value));
+ }
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+ passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), sideOutputResultSink1.getSortedResult());
+ assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), sideOutputResultSink2.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+ /**
* Test keyed ProcessFunction side output.
*/
@Test
@@ -405,6 +498,100 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
}
/**
+ * Test keyed CoProcessFunction side output.
+ */
+ @Test
+ public void testKeyedCoProcessFunctionSideOutput() throws Exception {
+ final OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> ds1 = see.fromCollection(elements);
+ DataStream<Integer> ds2 = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+ .keyBy(i -> i)
+ .connect(ds2.keyBy(i -> i))
+ .process(new CoProcessFunction<Integer, Integer, Integer>() {
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value < 3) {
+ out.collect(value);
+ ctx.output(sideOutputTag, "sideout1-" + String.valueOf(value));
+ }
+ }
+
+ @Override
+ public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value >= 3) {
+ out.collect(value);
+ ctx.output(sideOutputTag, "sideout2-" + String.valueOf(value));
+ }
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout2-3", "sideout2-4", "sideout2-5"), sideOutputResultSink.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+ /**
+ * Test keyed CoProcessFunction side output with multiple consumers.
+ */
+ @Test
+ public void testKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception {
+ final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
+ final OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
+
+ TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+ TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> ds1 = see.fromCollection(elements);
+ DataStream<Integer> ds2 = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+ .keyBy(i -> i)
+ .connect(ds2.keyBy(i -> i))
+ .process(new CoProcessFunction<Integer, Integer, Integer>() {
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value < 4) {
+ out.collect(value);
+ ctx.output(sideOutputTag1, "sideout1-" + String.valueOf(value));
+ }
+ }
+
+ @Override
+ public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ if (value >= 4) {
+ out.collect(value);
+ ctx.output(sideOutputTag2, "sideout2-" + String.valueOf(value));
+ }
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+ passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout1-1", "sideout1-2", "sideout1-3"), sideOutputResultSink1.getSortedResult());
+ assertEquals(Arrays.asList("sideout2-4", "sideout2-5"), sideOutputResultSink2.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+ /**
* Test ProcessFunction side outputs with wrong {@code OutputTag}.
*/
@Test