You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/11 11:15:49 UTC
[4/6] flink git commit: [FLINK-5237] Consolidate and harmonize Window
Translation Tests
[FLINK-5237] Consolidate and harmonize Window Translation Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa220e48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa220e48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa220e48
Branch: refs/heads/master
Commit: aa220e487db88079c224e50190055cef41df3f9a
Parents: 8c8c028
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jan 11 10:35:47 2017 +0100
----------------------------------------------------------------------
.../windowing/AllWindowTranslationTest.java | 841 ++++++++++---
.../windowing/TimeWindowTranslationTest.java | 112 +-
.../windowing/WindowTranslationTest.java | 844 ++++++++++---
flink-streaming-scala/pom.xml | 8 +
.../api/scala/AllWindowTranslationTest.scala | 1086 ++++++++++++++---
.../api/scala/TimeWindowTranslationTest.scala | 182 +++
.../api/scala/WindowTranslationTest.scala | 1131 +++++++++++++++---
7 files changed, 3561 insertions(+), 643 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 72b0850..3d4de5d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,121 +17,548 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link AllWindowedStream} instantiate the correct
+ * window operator.
+ *
+ * <p>We also create a test harness and push one element into the operator to verify
+ * that we get some output.
*/
+@SuppressWarnings("serial")
public class AllWindowTranslationTest {
/**
- * These tests ensure that the correct trigger is set when using event-time windows.
+ * .reduce() does not support RichReduceFunction, since the reduce function is used internally
+ * in a {@code ReducingState}.
*/
- @Test
- @SuppressWarnings("rawtypes")
- public void testEventTime() throws Exception {
+ @Test(expected = UnsupportedOperationException.class)
+ public void testReduceWithRichReducerFails() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = -6448847205314995812L;
+
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2) throws Exception {
+ return null;
+ }
+ });
+
+ fail("exception was not thrown");
+ }
+
+ /**
+ * .fold() does not support RichFoldFunction, since the fold function is used internally
+ * in a {@code FoldingState}.
+ */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = -6448847205314995812L;
+
+ @Override
+ public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2) throws Exception {
+ return null;
+ }
+ });
+
+ fail("exception was not thrown");
+ }
+
+
+ @Test
+ public void testSessionWithFoldFails() throws Exception {
+ // verify that fold does not work with merging windows
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+ .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+ try {
+ windowedStream.fold("", new FoldFunction<String, String>() {
+ private static final long serialVersionUID = -4567902917104921706L;
+
+ @Override
+ public String fold(String accumulator, String value) throws Exception {
+ return accumulator;
+ }
+ });
+ } catch (UnsupportedOperationException e) {
+ // expected
+ // use a catch to ensure that the exception is thrown by the fold
+ return;
+ }
+
+ fail("The fold call should fail.");
+ }
+
+ @Test
+ public void testMergingAssignerWithNonMergingTriggerFails() throws Exception {
+ // verify that we check for trigger compatibility
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+ .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+ try {
+ windowedStream.trigger(new Trigger<String, TimeWindow>() {
+ private static final long serialVersionUID = 6558046711583024443L;
+
+ @Override
+ public TriggerResult onElement(String element,
+ long timestamp,
+ TimeWindow window,
+ TriggerContext ctx) throws Exception {
+ return null;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long time,
+ TimeWindow window,
+ TriggerContext ctx) throws Exception {
+ return null;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long time,
+ TimeWindow window,
+ TriggerContext ctx) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean canMerge() {
+ return false;
+ }
+
+ @Override
+ public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
+ });
+ } catch (UnsupportedOperationException e) {
+ // expected
+ // use a catch to ensure that the exception is thrown by the fold
+ return;
+ }
+
+ fail("The trigger call should fail.");
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- DummyReducer reducer = new DummyReducer();
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer);
+ .reduce(new DummyReducer());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .reduce(new DummyReducer());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
+ /**
+ * Ignored because we currently don't have the fast processing-time window operator.
+ */
+ @Test
+ @SuppressWarnings("rawtypes")
+ @Ignore
+ public void testReduceFastProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .reduce(new DummyReducer());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator);
+
+ processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof WindowOperator);
- WindowOperator winOperator1 = (WindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
- Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
- DataStream<Tuple2<String, Integer>> window2 = source
+ DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ .reduce(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
@SuppressWarnings("rawtypes")
- public void testNonEvicting() throws Exception {
+ public void testReduceWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(new DummyReducer(), new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ /**
+ * Test for the deprecated .apply(Reducer, WindowFunction).
+ */
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreReducerEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
DummyReducer reducer = new DummyReducer();
- DataStream<Tuple2<String, Integer>> window1 = source
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer);
+ .fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .fold(new Tuple3<>("", "", 0), new DummyFolder());
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof WindowOperator);
- WindowOperator winOperator1 = (WindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
- Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
- DataStream<Tuple2<String, Integer>> window2 = source
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreFolderEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f1, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -140,24 +567,65 @@ public class AllWindowTranslationTest {
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyProcessingTimeTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+
}
+
@Test
@SuppressWarnings("rawtypes")
- public void testEvicting() throws Exception {
+ public void testReduceWithCustomTrigger() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -165,22 +633,56 @@ public class AllWindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .evictor(CountEvictor.of(100))
+ .trigger(CountTrigger.of(1))
.reduce(reducer);
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
- EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
- Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
- Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
- DataStream<Tuple2<String, Integer>> window2 = source
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithCustomTrigger() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .trigger(CountTrigger.of(1))
+ .fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithCustomTrigger() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+ .trigger(CountTrigger.of(1))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -189,136 +691,146 @@ public class AllWindowTranslationTest {
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
-
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
- EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
- Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
- /**
- * These tests ensure that a Fold buffer is used if possible
- */
@Test
@SuppressWarnings("rawtypes")
- public void testFoldBuffer() throws Exception {
+ public void testReduceWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- DummyFolder folder = new DummyFolder();
+ DummyReducer reducer = new DummyReducer();
- DataStream<Integer> window1 = source
+ DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .fold(0, folder);
+ .evictor(CountEvictor.of(100))
+ .reduce(reducer);
- OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof WindowOperator);
- WindowOperator winOperator1 = (WindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
- Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
- DataStream<Integer> window2 = source
- .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .evictor(CountEvictor.of(13))
- .fold(0, folder);
-
- OneInputTransformation<Tuple2<String, Integer>, Integer> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
- public void testSessionWithFold() throws Exception {
- // verify that fold does not work with merging windows
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testFoldWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
- .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+ DataStream<Tuple3<String, String, Integer>> window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
- try {
- windowedStream.fold("", new FoldFunction<String, String>() {
- private static final long serialVersionUID = -8722899157560218917L;
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- @Override
- public String fold(String accumulator, String value) throws Exception {
- return accumulator;
- }
- });
- } catch (UnsupportedOperationException e) {
- // expected
- // use a catch to ensure that the exception is thrown by the fold
- return;
- }
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- fail("The fold call should fail.");
- }
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+ .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
- @Test
- public void testMergingAssignerWithNonMergingTrigger() throws Exception {
- // verify that we check for trigger compatibility
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
- StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
- AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
- .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
- try {
- windowedStream.trigger(new Trigger<String, TimeWindow>() {
- private static final long serialVersionUID = 8360971631424870421L;
+ /**
+ * Ensure that we get some output from the given operator when pushing in an element and
+ * setting watermark and processing time to {@code Long.MAX_VALUE}.
+ */
+ private static <K, IN, OUT> void processElementAndEnsureOutput(
+ OneInputStreamOperator<IN, OUT> operator,
+ KeySelector<IN, K> keySelector,
+ TypeInformation<K> keyType,
+ IN element) throws Exception {
- @Override
- public TriggerResult onElement(String element,
- long timestamp,
- TimeWindow window,
- TriggerContext ctx) throws Exception {
- return null;
- }
+ KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ keySelector,
+ keyType);
- @Override
- public TriggerResult onProcessingTime(long time,
- TimeWindow window,
- TriggerContext ctx) throws Exception {
- return null;
- }
+ testHarness.open();
- @Override
- public TriggerResult onEventTime(long time,
- TimeWindow window,
- TriggerContext ctx) throws Exception {
- return null;
- }
+ testHarness.setProcessingTime(0);
+ testHarness.processWatermark(Long.MIN_VALUE);
- @Override
- public boolean canMerge() {
- return false;
- }
+ testHarness.processElement(new StreamRecord<>(element, 0));
- @Override
- public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
- });
- } catch (UnsupportedOperationException e) {
- // expected
- // use a catch to ensure that the exception is thrown by the fold
- return;
- }
+ // provoke any processing-time/event-time triggers
+ testHarness.setProcessingTime(Long.MAX_VALUE);
+ testHarness.processWatermark(Long.MAX_VALUE);
- fail("The trigger call should fail.");
+ // we at least get the two watermarks and should also see an output element
+ assertTrue(testHarness.getOutput().size() >= 3);
+
+ testHarness.close();
}
// ------------------------------------------------------------------------
@@ -334,13 +846,12 @@ public class AllWindowTranslationTest {
}
}
- public static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Integer> {
- private static final long serialVersionUID = 1L;
-
+ private static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
@Override
- public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception {
+ public Tuple3<String, String, Integer> fold(
+ Tuple3<String, String, Integer> accumulator,
+ Tuple2<String, Integer> value) throws Exception {
return accumulator;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 5aa8151..8e37021 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,9 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
@@ -44,9 +46,8 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit;
/**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link WindowedStream} that use the "time" shortcut
+ * instantiate the correct window operator.
*/
public class TimeWindowTranslationTest {
@@ -56,8 +57,9 @@ public class TimeWindowTranslationTest {
*/
@Test
@Ignore
- public void testFastTimeWindows() throws Exception {
+ public void testReduceFastTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -71,8 +73,21 @@ public class TimeWindowTranslationTest {
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+ }
- DataStream<Tuple2<String, Integer>> window2 = source
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ */
+ @Test
+ @Ignore
+ public void testApplyFastTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -87,25 +102,25 @@ public class TimeWindowTranslationTest {
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof AccumulatingProcessingTimeWindowOperator);
}
@Test
@SuppressWarnings("rawtypes")
- public void testEventTimeWindows() throws Exception {
+ public void testReduceEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(
+ Tuple2.of("hello", 1),
+ Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
- .keyBy(0)
- .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
- .reduce(reducer);
+ .keyBy(0)
+ .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+ .reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -114,8 +129,43 @@ public class TimeWindowTranslationTest {
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+ }
- DataStream<Tuple2<String, Integer>> window2 = source
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldEventTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(
+ Tuple2.of("hello", 1),
+ Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+ .fold(new Tuple2<>("", 1), new DummyFolder());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyEventTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(
+ Tuple2.of("hello", 1),
+ Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -130,14 +180,13 @@ public class TimeWindowTranslationTest {
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
-
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
}
/**
@@ -187,7 +236,7 @@ public class TimeWindowTranslationTest {
// UDFs
// ------------------------------------------------------------------------
- public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
+ private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
@@ -195,4 +244,15 @@ public class TimeWindowTranslationTest {
return value1;
}
}
+
+ private static class DummyFolder
+ implements FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+ return value1;
+ }
+ }
+
}