You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:15 UTC

[5/8] flink git commit: [FLINK-4997] [streaming] Introduce ProcessWindowFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 104bc7b..a9c3ef6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -59,7 +60,9 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -432,6 +435,78 @@ public class WindowOperatorTest extends TestLogger {
 
 	@Test
 	@SuppressWarnings("unchecked")
+	public void testSessionWindowsWithProcessFunction() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
+				EventTimeTrigger.create(),
+				0);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+
+		// do a snapshot, close and restore again
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+		testHarness.processWatermark(new Watermark(12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+		expectedOutput.add(new Watermark(12000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+		testHarness.processWatermark(new Watermark(17999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+		expectedOutput.add(new Watermark(17999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
 	public void testReduceSessionWindows() throws Exception {
 		closeCalled.set(0);
 
@@ -500,6 +575,76 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testReduceSessionWindowsWithProcessFunction() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
+				"window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
+				EventTimeTrigger.create(),
+				0);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+		// do a snapshot, close and restore again
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+		testHarness.processWatermark(new Watermark(12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+		expectedOutput.add(new Watermark(12000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+		testHarness.processWatermark(new Watermark(17999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+		expectedOutput.add(new Watermark(17999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
 	/**
 	 * This tests whether merging works correctly with the CountTrigger.
 	 * @throws Exception
@@ -2379,6 +2524,38 @@ public class WindowOperatorTest extends TestLogger {
 		}
 	}
 
+	public static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void process(String key,
+				Context context,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			int sum = 0;
+			for (Tuple2<String, Integer> i: values) {
+				sum += i.f1;
+			}
+			String resultString = key + "-" + sum;
+			TimeWindow window = context.window();
+			out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
+		}
+	}
+
+	public static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void process(String key,
+				Context context,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			TimeWindow window = context.window();
+			for (Tuple2<String, Integer> val: values) {
+				out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd()));
+			}
+		}
+	}
 
 	public static class PointSessionWindows extends EventTimeSessionWindows {
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 1e3e3d5..7d37d1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -18,17 +18,22 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -114,6 +119,79 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testFoldProcessWindow() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("b", 5));
+
+				ctx.collect(Tuple2.of("a", 6));
+				ctx.collect(Tuple2.of("a", 7));
+				ctx.collect(Tuple2.of("a", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {}
+
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+		source1
+			.keyBy(0)
+			.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+			.fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+				@Override
+				public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
+					accumulator.f1 += value.f0;
+					accumulator.f0 += value.f1;
+					return accumulator;
+				}
+			}, new ProcessWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, Tuple, TimeWindow>() {
+				@Override
+				public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+					int i = 0;
+					for (Tuple2<Integer, String> in : elements) {
+						out.collect(new Tuple3<>(in.f1, in.f0, i++));
+					}
+				}
+			})
+			.addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() {
+				@Override
+				public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
+					testResults.add(value.toString());
+				}
+			});
+
+		env.execute("Fold Process Window Test");
+
+		List<String> expectedResult = Arrays.asList(
+			"(R:aaa,3,0)",
+			"(R:aaa,21,0)",
+			"(R:bbb,12,0)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
 	public void testFoldAllWindow() throws Exception {
 
 		testResults = new ArrayList<>();