You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:42 UTC
[03/11] flink git commit: [FLINK-4460] Add side outputs for
ProcessFunction
[FLINK-4460] Add side outputs for ProcessFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f828657d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f828657d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f828657d
Branch: refs/heads/master
Commit: f828657dfe2fa96c87c3545cb0aa679d67587862
Parents: d406915
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 21 12:38:04 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100
----------------------------------------------------------------------
.../examples/sideoutput/SideOutputExample.java | 151 +++++++++++++++++++
.../api/functions/ProcessFunction.java | 9 ++
.../api/operators/KeyedProcessOperator.java | 23 ++-
.../api/operators/ProcessOperator.java | 6 +
.../streaming/runtime/SideOutputITCase.java | 146 ++++++++++++++++++
5 files changed, 329 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
new file mode 100644
index 0000000..bfd3cd7
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.sideoutput;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * An example that illustrates the use of side outputs.
+ *
+ * <p>This is a modified version of {@link org.apache.flink.streaming.examples.windowing.WindowWordCount}
+ * that has a filter in the tokenizer and only emits some words for counting
+ * while emitting the other words to a side output.
+ */
+public class SideOutputExample {
+
+ /**
+ * We need to create an {@link OutputTag} so that we can reference it when emitting
+ * data to a side output and also to retrieve the side output stream from an operation.
+ */
+ static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
+
+ public static void main(String[] args) throws Exception {
+
+ // Checking input parameters
+ final ParameterTool params = ParameterTool.fromArgs(args);
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(params);
+
+ // get input data
+ DataStream<String> text;
+ if (params.has("input")) {
+ // read the text file from given input path
+ text = env.readTextFile(params.get("input"));
+ } else {
+ System.out.println("Executing WordCount example with default input data set.");
+ System.out.println("Use --input to specify file input.");
+ // get default test text data
+ text = env.fromElements(WordCountData.WORDS);
+ }
+
+ SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
+ .keyBy(new KeySelector<String, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(String value) throws Exception {
+ return 0;
+ }
+ })
+ .process(new Tokenizer());
+
+ DataStream<String> rejectedWords = tokenized
+ .getSideOutput(rejectedWordsTag)
+ .map(new MapFunction<String, String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return "rejected: " + value;
+ }
+ });
+
+ DataStream<Tuple2<String, Integer>> counts = tokenized
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+ // group by the tuple field "0" and sum up tuple field "1"
+ .sum(1);
+
+ // emit result
+ if (params.has("output")) {
+ counts.writeAsText(params.get("output"));
+ rejectedWords.writeAsText(params.get("rejected-words-output"));
+ } else {
+ System.out.println("Printing result to stdout. Use --output to specify output path.");
+ counts.print();
+ rejectedWords.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount SideOutput");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a
+ * user-defined FlatMapFunction. The function takes a line (String) and
+ * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
+ * Integer>}).
+ *
+ * <p>This rejects words that are longer than 5 characters long.
+ */
+ public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ String value,
+ Context ctx,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 5) {
+ ctx.output(rejectedWordsTag, token);
+ } else if (token.length() > 0) {
+ out.collect(new Tuple2<>(token, 1));
+ }
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index 48418af..43683ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* A function that processes elements of a stream.
@@ -101,6 +102,14 @@ public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
* A {@link TimerService} for querying time and registering timers.
*/
public abstract TimerService timerService();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ *
+ * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+ * @param value The record to emit.
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index d8dfb0f..97bc8ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -38,9 +39,9 @@ public class KeyedProcessOperator<K, IN, OUT>
private transient TimestampedCollector<OUT> collector;
- private transient ContextImpl<IN, OUT> context;
+ private transient ContextImpl context;
- private transient OnTimerContextImpl<IN, OUT> onTimerContext;
+ private transient OnTimerContextImpl onTimerContext;
public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
super(function);
@@ -58,8 +59,8 @@ public class KeyedProcessOperator<K, IN, OUT>
TimerService timerService = new SimpleTimerService(internalTimerService);
- context = new ContextImpl<>(userFunction, timerService);
- onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+ context = new ContextImpl(userFunction, timerService);
+ onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
@Override
@@ -90,7 +91,7 @@ public class KeyedProcessOperator<K, IN, OUT>
context.element = null;
}
- private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
+ private class ContextImpl extends ProcessFunction<IN, OUT>.Context {
private final TimerService timerService;
@@ -113,12 +114,17 @@ public class KeyedProcessOperator<K, IN, OUT>
}
@Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+ }
+
+ @Override
public TimerService timerService() {
return timerService;
}
}
- private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
+ private class OnTimerContextImpl extends ProcessFunction<IN, OUT>.OnTimerContext{
private final TimerService timerService;
@@ -144,6 +150,11 @@ public class KeyedProcessOperator<K, IN, OUT>
}
@Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
+ }
+
+ @Override
public TimerService timerService() {
return timerService;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 13b68f4..f73b610 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
import static org.apache.flink.util.Preconditions.checkState;
@@ -93,6 +94,11 @@ public class ProcessOperator<IN, OUT>
}
@Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+ }
+
+ @Override
public long currentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 2f92897..f87ee30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -57,6 +58,151 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase {
elements.add(4);
}
+ private final static OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+ private final static OutputTag<String> otherSideOutputTag = new OutputTag<String>("other-side"){};
+
+ /**
+ * Test ProcessFunction side output.
+ */
+ @Test
+ public void testProcessFunctionSideOutput() throws Exception {
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+ .process(new ProcessFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+ /**
+ * Test keyed ProcessFunction side output.
+ */
+ @Test
+ public void testKeyedProcessFunctionSideOutput() throws Exception {
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+ .keyBy(new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .process(new ProcessFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+ }
+ });
+
+ passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+ }
+
+
+ /**
+ * Test ProcessFunction side outputs with wrong {@code OutputTag}.
+ */
+ @Test
+ public void testProcessFunctionSideOutputWithWrongTag() throws Exception {
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ dataStream
+ .process(new ProcessFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ ctx.output(otherSideOutputTag, "sideout-" + String.valueOf(value));
+ }
+ }).getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+
+ see.execute();
+
+ assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult());
+ }
+
+ /**
+ * Test keyed ProcessFunction side outputs with wrong {@code OutputTag}.
+ */
+ @Test
+ public void testKeyedProcessFunctionSideOutputWithWrongTag() throws Exception {
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ dataStream
+ .keyBy(new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .process(new ProcessFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ ctx.output(otherSideOutputTag, "sideout-" + String.valueOf(value));
+ }
+ }).getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+
+ see.execute();
+
+ assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult());
+ }
+
+
private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Integer> {
private static final long serialVersionUID = 1L;