You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:12 UTC
[2/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window
Translation Tests
[FLINK-5237] Consolidate and harmonize Window Translation Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5368a7d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5368a7d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5368a7d3
Branch: refs/heads/master
Commit: 5368a7d32d96beb1b8298b87d9ea6d42ea306947
Parents: fe2a301
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../operators/StateDescriptorPassingTest.java | 26 +
.../windowing/WindowTranslationTest.java | 718 +++++++++++++++--
.../ScalaProcessWindowFunctionWrapper.scala | 16 +-
.../api/scala/WindowTranslationTest.scala | 766 +++++++++++++++++--
4 files changed, 1420 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 26cb7ac..813ca96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -136,6 +137,31 @@ public class StateDescriptorPassingTest {
}
@Test
+ public void testProcessWindowState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .keyBy(new KeySelector<File, String>() {
+ @Override
+ public String getKey(File value) {
+ return null;
+ }
+ })
+ .timeWindow(Time.milliseconds(1000))
+ .process(new ProcessWindowFunction<File, String, String, TimeWindow>() {
+ @Override
+ public void process(String s, Context ctx,
+ Iterable<File> input, Collector<String> out) {}
+ });
+
+ validateListStateDescriptorConfigured(result);
+ }
+
+ @Test
public void testFoldWindowAllState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index f72a2f1..b899948 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
@@ -112,7 +115,7 @@ public class WindowTranslationTest {
* in a {@code AggregatingState}.
*/
@Test(expected = UnsupportedOperationException.class)
- public void testAgrgegateWithRichFunctionFails() throws Exception {
+ public void testAggregateWithRichFunctionFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -405,6 +408,82 @@ public class WindowTranslationTest {
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithProcesWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(reducer, new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String tuple,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
/**
* Test for the deprecated .apply(Reducer, WindowFunction).
*/
@@ -447,6 +526,50 @@ public class WindowTranslationTest {
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ /**
+ * Test for the deprecated .apply(Reducer, WindowFunction).
+ */
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreReducerAndEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .evictor(CountEvictor.of(100))
+ .apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
// ------------------------------------------------------------------------
// Aggregate Translation Tests
// ------------------------------------------------------------------------
@@ -463,13 +586,13 @@ public class WindowTranslationTest {
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -492,13 +615,13 @@ public class WindowTranslationTest {
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
@@ -529,7 +652,7 @@ public class WindowTranslationTest {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -569,6 +692,66 @@ public class WindowTranslationTest {
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ public void testAggregateWithProcessWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+ processElementAndEnsureOutput(
+ operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+ processElementAndEnsureOutput(
+ operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
// ------------------------------------------------------------------------
// Fold Translation Tests
// ------------------------------------------------------------------------
@@ -664,83 +847,269 @@ public class WindowTranslationTest {
@SuppressWarnings("rawtypes")
public void testFoldWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithProcessWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreFolderEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreFolderAndEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .evictor(CountEvictor.of(100))
+ .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Apply Translation Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- DataStream<Tuple2<String, Integer>> window = source
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
- .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(String key,
TimeWindow window,
- Iterable<Tuple3<String, String, Integer>> values,
+ Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
- for (Tuple3<String, String, Integer> in : values) {
- out.collect(new Tuple2<>(in.f0, in.f2));
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
}
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
- (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
- Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
- Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
- Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
@SuppressWarnings("rawtypes")
- public void testApplyWithPreFolderEventTime() throws Exception {
+ public void testApplyProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- DataStream<Tuple3<String, String, Integer>> window = source
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
- .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(String key,
TimeWindow window,
- Iterable<Tuple3<String, String, Integer>> values,
- Collector<Tuple3<String, String, Integer>> out) throws Exception {
- for (Tuple3<String, String, Integer> in : values) {
- out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
}
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
- (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
- Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
- }
- // ------------------------------------------------------------------------
- // Apply Translation Tests
- // ------------------------------------------------------------------------
+ }
@Test
@SuppressWarnings("rawtypes")
- public void testApplyEventTime() throws Exception {
+ public void testProcessEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -749,12 +1118,12 @@ public class WindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void apply(String key,
- TimeWindow window,
+ public void process(String key,
+ Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
@@ -776,7 +1145,7 @@ public class WindowTranslationTest {
@Test
@SuppressWarnings("rawtypes")
- public void testApplyProcessingTimeTime() throws Exception {
+ public void testProcessProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
@@ -785,12 +1154,12 @@ public class WindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void apply(String key,
- TimeWindow window,
+ public void process(String key,
+ Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
@@ -903,6 +1272,43 @@ public class WindowTranslationTest {
@Test
@SuppressWarnings("rawtypes")
+ public void testProcessWithCustomTrigger() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
public void testReduceWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -930,6 +1336,121 @@ public class WindowTranslationTest {
}
@Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .reduce(
+ reducer,
+ new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ @Override
+ public void process(
+ Tuple tuple,
+ Context context,
+ Iterable<Tuple2<String, Integer>> elements,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(new DummyAggregationFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(
+ winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(
+ new DummyAggregationFunction(),
+ new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ @Override
+ public void process(
+ String s,
+ Context context,
+ Iterable<Tuple2<String, Integer>> elements,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(
+ winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
+ @Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -958,6 +1479,48 @@ public class WindowTranslationTest {
}
@Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testFoldWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .fold(
+ new Tuple3<>("", "", 1),
+ new DummyFolder(),
+ new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, Tuple, TimeWindow>() {
+ @Override
+ public void process(
+ Tuple tuple,
+ Context context,
+ Iterable<Tuple3<String, String, Integer>> elements,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
@SuppressWarnings("rawtypes")
public void testApplyWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -996,6 +1559,45 @@ public class WindowTranslationTest {
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testProcessWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
/**
* Ensure that we get some output from the given operator when pushing in an element and
* setting watermark and processing time to {@code Long.MAX_VALUE}.
@@ -1012,6 +1614,12 @@ public class WindowTranslationTest {
keySelector,
keyType);
+ if (operator instanceof OutputTypeConfigurable) {
+ // use a dummy type since window functions just need the ExecutionConfig
+ // this is also only needed for Fold, which we're getting rid off soon.
+ ((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+ }
+
testHarness.open();
testHarness.setProcessingTime(0);
@@ -1050,7 +1658,7 @@ public class WindowTranslationTest {
}
}
- private static class DummyAggregationFunction
+ private static class DummyAggregationFunction
implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
@@ -1096,7 +1704,7 @@ public class WindowTranslationTest {
}
}
- private static class TestWindowFunction
+ private static class TestWindowFunction
implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
@Override
@@ -1111,6 +1719,22 @@ public class WindowTranslationTest {
}
}
+ private static class TestProcessWindowFunction
+ extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ }
+
+
private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 4a20371..23293a6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.api.scala.function.util
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
-import org.apache.flink.api.java.operators.translation.WrappingFunction
import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
@@ -37,8 +35,7 @@ import scala.collection.JavaConverters._
*/
final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
- extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
- with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+ extends JProcessWindowFunction[IN, OUT, KEY, W] {
override def process(
key: KEY,
@@ -50,15 +47,4 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
}
func.process(key, ctx, elements.asScala, out)
}
-
- override def getRuntimeContext: RuntimeContext = {
- throw new RuntimeException("This should never be called")
- }
-
- override def getIterationRuntimeContext: IterationRuntimeContext = {
- throw new RuntimeException("This should never be called")
- }
}
-
-private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
- extends JProcessWindowFunction[IN, OUT, KEY, W]