You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:15 UTC
[5/8] flink git commit: [FLINK-4997] [streaming] Introduce
ProcessWindowFunction
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 104bc7b..a9c3ef6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -59,7 +60,9 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -432,6 +435,78 @@ public class WindowOperatorTest extends TestLogger {
@Test
@SuppressWarnings("unchecked")
+ public void testSessionWindowsWithProcessFunction() throws Exception {
+ closeCalled.set(0);
+
+ final int SESSION_SIZE = 3;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+ EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
+ EventTimeTrigger.create(),
+ 0);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+
+ // do a snapshot, close and restore again
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+ testHarness.processWatermark(new Watermark(12000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+ expectedOutput.add(new Watermark(12000));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+ testHarness.processWatermark(new Watermark(17999));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+ expectedOutput.add(new Watermark(17999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
public void testReduceSessionWindows() throws Exception {
closeCalled.set(0);
@@ -500,6 +575,76 @@ public class WindowOperatorTest extends TestLogger {
testHarness.close();
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testReduceSessionWindowsWithProcessFunction() throws Exception {
+ closeCalled.set(0);
+
+ final int SESSION_SIZE = 3;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
+ "window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+ EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
+ EventTimeTrigger.create(),
+ 0);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+ // do a snapshot, close and restore again
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+ testHarness.processWatermark(new Watermark(12000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+ expectedOutput.add(new Watermark(12000));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+ testHarness.processWatermark(new Watermark(17999));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+ expectedOutput.add(new Watermark(17999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+ testHarness.close();
+ }
+
/**
* This tests whether merging works correctly with the CountTrigger.
* @throws Exception
@@ -2379,6 +2524,38 @@ public class WindowOperatorTest extends TestLogger {
}
}
+ public static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context context,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, Long, Long>> out) throws Exception {
+ int sum = 0;
+ for (Tuple2<String, Integer> i: values) {
+ sum += i.f1;
+ }
+ String resultString = key + "-" + sum;
+ TimeWindow window = context.window();
+ out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
+ }
+ }
+
+ public static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context context,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, Long, Long>> out) throws Exception {
+ TimeWindow window = context.window();
+ for (Tuple2<String, Integer> val: values) {
+ out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd()));
+ }
+ }
+ }
public static class PointSessionWindows extends EventTimeSessionWindows {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 1e3e3d5..7d37d1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -18,17 +18,22 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
@@ -114,6 +119,79 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}
@Test
+ public void testFoldProcessWindow() throws Exception {
+
+ testResults = new ArrayList<>();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+ ctx.collect(Tuple2.of("a", 2));
+
+ ctx.collect(Tuple2.of("b", 3));
+ ctx.collect(Tuple2.of("b", 4));
+ ctx.collect(Tuple2.of("b", 5));
+
+ ctx.collect(Tuple2.of("a", 6));
+ ctx.collect(Tuple2.of("a", 7));
+ ctx.collect(Tuple2.of("a", 8));
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ @Override
+ public void cancel() {}
+
+ }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+ @Override
+ public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
+ accumulator.f1 += value.f0;
+ accumulator.f0 += value.f1;
+ return accumulator;
+ }
+ }, new ProcessWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, Tuple, TimeWindow>() {
+ @Override
+ public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+ int i = 0;
+ for (Tuple2<Integer, String> in : elements) {
+ out.collect(new Tuple3<>(in.f1, in.f0, i++));
+ }
+ }
+ })
+ .addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() {
+ @Override
+ public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
+ testResults.add(value.toString());
+ }
+ });
+
+ env.execute("Fold Process Window Test");
+
+ List<String> expectedResult = Arrays.asList(
+ "(R:aaa,3,0)",
+ "(R:aaa,21,0)",
+ "(R:bbb,12,0)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ @Test
public void testFoldAllWindow() throws Exception {
testResults = new ArrayList<>();