You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:42 UTC

[03/11] flink git commit: [FLINK-4460] Add side outputs for ProcessFunction

[FLINK-4460] Add side outputs for ProcessFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f828657d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f828657d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f828657d

Branch: refs/heads/master
Commit: f828657dfe2fa96c87c3545cb0aa679d67587862
Parents: d406915
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 21 12:38:04 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../examples/sideoutput/SideOutputExample.java  | 151 +++++++++++++++++++
 .../api/functions/ProcessFunction.java          |   9 ++
 .../api/operators/KeyedProcessOperator.java     |  23 ++-
 .../api/operators/ProcessOperator.java          |   6 +
 .../streaming/runtime/SideOutputITCase.java     | 146 ++++++++++++++++++
 5 files changed, 329 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
new file mode 100644
index 0000000..bfd3cd7
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.sideoutput;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * An example that illustrates the use of side outputs.
+ *
+ * <p>This is a modified version of {@link org.apache.flink.streaming.examples.windowing.WindowWordCount}
+ * that has a filter in the tokenizer and only emits some words for counting
+ * while emitting the other words to a side output.
+ */
+public class SideOutputExample {
+
+	/**
+	 * We need to create an {@link OutputTag} so that we can reference it when emitting
+	 * data to a side output and also to retrieve the side output stream from an operation.
+	 */
+	static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
+
+	public static void main(String[] args) throws Exception {
+
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
+
+		// get input data
+		DataStream<String> text;
+		if (params.has("input")) {
+			// read the text file from given input path
+			text = env.readTextFile(params.get("input"));
+		} else {
+			System.out.println("Executing WordCount example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			// get default test text data
+			text = env.fromElements(WordCountData.WORDS);
+		}
+
+		SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
+				.keyBy(new KeySelector<String, Integer>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public Integer getKey(String value) throws Exception {
+						return 0;
+					}
+				})
+				.process(new Tokenizer());
+
+		DataStream<String> rejectedWords = tokenized
+				.getSideOutput(rejectedWordsTag)
+				.map(new MapFunction<String, String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public String map(String value) throws Exception {
+						return "rejected: " + value;
+					}
+				});
+
+		DataStream<Tuple2<String, Integer>> counts = tokenized
+				.keyBy(0)
+				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
+				// group by the tuple field "0" and sum up tuple field "1"
+				.sum(1);
+
+		// emit result
+		if (params.has("output")) {
+			counts.writeAsText(params.get("output"));
+			rejectedWords.writeAsText(params.get("rejected-words-output"));
+		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
+			counts.print();
+			rejectedWords.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount SideOutput");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
+	 * Integer>}).
+	 *
+	 * <p>This rejects words that are longer than 5 characters long.
+	 */
+	public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement(
+				String value,
+				Context ctx,
+				Collector<Tuple2<String, Integer>> out) throws Exception {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 5) {
+					ctx.output(rejectedWordsTag, token);
+				} else if (token.length() > 0) {
+					out.collect(new Tuple2<>(token, 1));
+				}
+			}
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index 48418af..43683ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * A function that processes elements of a stream.
@@ -101,6 +102,14 @@ public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
 		 * A {@link TimerService} for querying time and registering timers.
 		 */
 		public abstract TimerService timerService();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 * 
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index d8dfb0f..97bc8ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -38,9 +39,9 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 	private transient TimestampedCollector<OUT> collector;
 
-	private transient ContextImpl<IN, OUT> context;
+	private transient ContextImpl context;
 
-	private transient OnTimerContextImpl<IN, OUT> onTimerContext;
+	private transient OnTimerContextImpl onTimerContext;
 
 	public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
 		super(function);
@@ -58,8 +59,8 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 		TimerService timerService = new SimpleTimerService(internalTimerService);
 
-		context = new ContextImpl<>(userFunction, timerService);
-		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+		context = new ContextImpl(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl(userFunction, timerService);
 	}
 
 	@Override
@@ -90,7 +91,7 @@ public class KeyedProcessOperator<K, IN, OUT>
 		context.element = null;
 	}
 
-	private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
+	private class ContextImpl extends ProcessFunction<IN, OUT>.Context {
 
 		private final TimerService timerService;
 
@@ -113,12 +114,17 @@ public class KeyedProcessOperator<K, IN, OUT>
 		}
 
 		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+		}
+
+		@Override
 		public TimerService timerService() {
 			return timerService;
 		}
 	}
 
-	private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
+	private class OnTimerContextImpl extends ProcessFunction<IN, OUT>.OnTimerContext{
 
 		private final TimerService timerService;
 
@@ -144,6 +150,11 @@ public class KeyedProcessOperator<K, IN, OUT>
 		}
 
 		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
+		}
+
+		@Override
 		public TimerService timerService() {
 			return timerService;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 13b68f4..f73b610 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -93,6 +94,11 @@ public class ProcessOperator<IN, OUT>
 		}
 
 		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+		}
+
+		@Override
 		public long currentProcessingTime() {
 			return processingTimeService.getCurrentProcessingTime();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f828657d/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 2f92897..f87ee30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -57,6 +58,151 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase {
 		elements.add(4);
 	}
 
+	private final static OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+	private final static OutputTag<String> otherSideOutputTag = new OutputTag<String>("other-side"){};
+
+	/**
+	 * Test ProcessFunction side output.
+	 */
+	@Test
+	public void testProcessFunctionSideOutput() throws Exception {
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
+	 * Test keyed ProcessFunction side output.
+	 */
+	@Test
+	public void testKeyedProcessFunctionSideOutput() throws Exception {
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+				.keyBy(new KeySelector<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				})
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+
+	/**
+	 * Test ProcessFunction side outputs with wrong {@code OutputTag}.
+	 */
+	@Test
+	public void testProcessFunctionSideOutputWithWrongTag() throws Exception {
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		dataStream
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(otherSideOutputTag, "sideout-" + String.valueOf(value));
+					}
+				}).getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+
+		see.execute();
+
+		assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult());
+	}
+
+	/**
+	 * Test keyed ProcessFunction side outputs with wrong {@code OutputTag}.
+	 */
+	@Test
+	public void testKeyedProcessFunctionSideOutputWithWrongTag() throws Exception {
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		dataStream
+				.keyBy(new KeySelector<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				})
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(otherSideOutputTag, "sideout-" + String.valueOf(value));
+					}
+				}).getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+
+		see.execute();
+
+		assertEquals(Arrays.asList(), sideOutputResultSink.getSortedResult());
+	}
+
+
 	private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Integer> {
 		private static final long serialVersionUID = 1L;