You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/11 11:15:47 UTC
[2/6] flink git commit: [FLINK-5237] Consolidate and harmonize Window
Translation Tests
http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 6273e54..c738955 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -19,247 +19,1017 @@
package org.apache.flink.streaming.api.scala
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor}
-import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.functions.{FoldFunction, RichFoldFunction, RichReduceFunction}
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, EventTimeTrigger, ProcessingTimeTrigger, Trigger}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.streaming.runtime.operators.windowing._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.util.Collector
-
import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
+
+/**
+ * These tests verify that the api calls on [[AllWindowedStream]] instantiate the correct
+ * window operator.
+ *
+ * We also create a test harness and push one element into the operator to verify
+ * that we get some output.
+ */
+class AllWindowTranslationTest {
+
+ /**
+ * .reduce() does not support [[RichReduceFunction]], since the reduce function is used
+ * internally in a [[org.apache.flink.api.common.state.ReducingState]].
+ */
+ @Test(expected = classOf[UnsupportedOperationException])
+ def testReduceWithRichReducerFails() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .reduce(new RichReduceFunction[(String, Int)] {
+ override def reduce(value1: (String, Int), value2: (String, Int)) = null
+ })
-class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+ fail("exception was not thrown")
+ }
/**
- * These tests ensure that the fast aligned time windows operator is used if the
- * conditions are right.
- *
- * TODO: update once we have optimized aligned time windows operator for all-windows
- */
- @Ignore
+ * .fold() does not support [[RichFoldFunction]], since the reduce function is used internally
+ * in a [[org.apache.flink.api.common.state.FoldingState]].
+ */
+ @Test(expected = classOf[UnsupportedOperationException])
+ def testFoldWithRichFolderFails() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] {
+ override def fold(accumulator: (String, Int), value: (String, Int)) = null
+ })
+
+ fail("exception was not thrown")
+ }
+
+ @Test
+ def testSessionWithFoldFails() {
+ // verify that fold does not work with merging windows
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val windowedStream = env.fromElements("Hello", "Ciao")
+ .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
+
+ try
+ windowedStream.fold("", new FoldFunction[String, String]() {
+ @throws[Exception]
+ def fold(accumulator: String, value: String): String = accumulator
+ })
+
+ catch {
+ case _: UnsupportedOperationException =>
+ // expected
+ // use a catch to ensure that the exception is thrown by the fold
+ return
+ }
+
+ fail("The fold call should fail.")
+ }
+
+ @Test
+ def testMergingAssignerWithNonMergingTriggerFails() {
+ // verify that we check for trigger compatibility
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val windowedStream = env.fromElements("Hello", "Ciao")
+ .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
+
+ try
+ windowedStream.trigger(new Trigger[String, TimeWindow]() {
+ def onElement(
+ element: String,
+ timestamp: Long,
+ window: TimeWindow,
+ ctx: Trigger.TriggerContext) = null
+
+ def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null
+
+ def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null
+
+ override def canMerge = false
+
+ def clear(window: TimeWindow, ctx: Trigger.TriggerContext) {}
+ })
+
+ catch {
+ case _: UnsupportedOperationException =>
+ // expected
+ // use a catch to ensure that the exception is thrown by the fold
+ return
+ }
+
+ fail("The trigger call should fail.")
+ }
+
+ @Test
+ def testReduceEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .reduce(new DummyReducer)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
@Test
- def testFastTimeWindows(): Unit = {
+ def testReduceProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
- val reducer = new DummyReducer
+ val window1 = source
+ .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .reduce(new DummyReducer)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testReduceEventTimeWithScalaFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
- .windowAll(SlidingEventTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer)
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .reduce( (x, _) => x )
- val transform1 = window1.javaStream.getTransformation
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator1 = transform1.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
- assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
- val window2 = source
- .keyBy(0)
- .windowAll(SlidingEventTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
- def apply(
- window: TimeWindow,
- values: Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testReduceWithWindowFunctionEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
- val transform2 = window2.javaStream.getTransformation
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator2 = transform2.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
- assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
}
@Test
- def testNonEvicting(): Unit = {
+ def testReduceWithWindowFunctionProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
- val reducer = new DummyReducer
+ val window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyWithPreReducerEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
- .windowAll(SlidingEventTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer)
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .apply(
+ new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
- val transform1 = window1.javaStream.getTransformation
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator1 = transform1.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
- assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
- assertTrue(
- winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
- val window2 = source
- .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
- def apply(
- window: TimeWindow,
- values: Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
- val transform2 = window2.javaStream.getTransformation
+ @Test
+ def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ { (x, _) => x },
+ { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator2 = transform2.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
- assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
- assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
}
+
@Test
- def testEvicting(): Unit = {
+ def testFoldEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
- val reducer = new DummyReducer
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(("", "", 1), new DummyFolder)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testFoldProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
- .reduce(reducer)
+ .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(("", "", 1), new DummyFolder)
- val transform1 = window1.javaStream.getTransformation
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator1 = transform1.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
- assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
- assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
- assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testFoldEventTimeWithScalaFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+ val source = env.fromElements(("hello", 1), ("hello", 2))
- val window2 = source
- .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(1000))
- .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
- def apply(
- window: TimeWindow,
- values: Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(("", "", 1)) { (acc, _) => acc }
- val transform2 = window2.javaStream.getTransformation
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator2 = transform2.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
- assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
- assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
}
+
@Test
- def testPreReduce(): Unit = {
+ def testFoldWithWindowFunctionEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
- val reducer = new DummyReducer
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(
+ ("", "", 1),
+ new DummyFolder,
+ new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testFoldWithWindowFunctionProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
- .keyBy(0)
- .window(SlidingEventTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(
+ ("", "", 1),
+ new DummyFolder,
+ new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+ })
- val transform1 = window1.javaStream.getTransformation
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
- assertTrue(
- winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
-
-
- val window2 = source
- .keyBy(0)
- .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyWithPreFolderEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply(
+ ("", "", 1),
+ new DummyFolder,
+ new AllWindowFunction[(String, String, Int), (String, String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, String, Int)]): Unit =
+ input foreach {x => out.collect((x._1, x._2, x._3))}
+ })
- val transform2 = window2.javaStream.getTransformation
+ val transform = window1
+ .javaStream
+ .getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
- val operator2 = transform2.getOperator
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
- assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
- assertTrue(
- winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
}
-}
+ @Test
+ def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(
+ ("", "", 1),
+ { (acc: (String, String, Int), _) => acc },
+ { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+ in foreach { x => out.collect((x._1, x._3)) }
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+
+ @Test
+ def testApplyEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply(
+ new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyProcessingTimeTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply(
+ new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyEventTimeWithScalaFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply { (window, in, out: Collector[(String, Int)]) =>
+ in foreach { x => out.collect(x)}
+ }
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+
+ @Test
+ def testReduceWithCustomTrigger() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .trigger(CountTrigger.of(1))
+ .reduce(new DummyReducer)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testFoldWithCustomTrigger() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .trigger(CountTrigger.of(1))
+ .fold(("", "", 1), new DummyFolder)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyWithCustomTrigger() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .trigger(CountTrigger.of(1))
+ .apply(
+ new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testReduceWithEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .reduce(new DummyReducer)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[
+ EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testFoldWithEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .fold(("", "", 1), new DummyFolder)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[
+ EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ winOperator.setOutputType(
+ window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]],
+ new ExecutionConfig)
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyWithEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .apply(
+ new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def apply(
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+
+ /**
+ * Ensure that we get some output from the given operator when pushing in an element and
+ * setting watermark and processing time to `Long.MaxValue`.
+ */
+ @throws[Exception]
+ private def processElementAndEnsureOutput[K, IN, OUT](
+ operator: OneInputStreamOperator[IN, OUT],
+ keySelector: KeySelector[IN, K],
+ keyType: TypeInformation[K],
+ element: IN) {
+ val testHarness =
+ new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(0)
+ testHarness.processWatermark(Long.MinValue)
+
+ testHarness.processElement(new StreamRecord[IN](element, 0))
+
+ // provoke any processing-time/event-time triggers
+ testHarness.setProcessingTime(Long.MaxValue)
+ testHarness.processWatermark(Long.MaxValue)
-// ------------------------------------------------------------------------
-// UDFs
-// ------------------------------------------------------------------------
+ // we at least get the two watermarks and should also see an output element
+ assertTrue(testHarness.getOutput.size >= 3)
-class DummyReducer extends ReduceFunction[(String, Int)] {
- def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
- value1
+ testHarness.close()
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
new file mode 100644
index 0000000..ff97656
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, WindowOperator}
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.util.Collector
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+
+/**
+ * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut
+ * instantiate the correct window operator.
+ */
+class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ */
+ @Test
+ @Ignore
+ def testReduceFastTimeWindows(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(0)
+ .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+ .reduce(new DummyReducer())
+
+ val transform1 = window1.javaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+ }
+
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ */
+ @Test
+ @Ignore
+ def testApplyFastTimeWindows(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(0)
+ .timeWindow(Time.minutes(1))
+ .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+ def apply(
+ key: Tuple,
+ window: TimeWindow,
+ values: Iterable[(String, Int)],
+ out: Collector[(String, Int)]) { }
+ })
+
+ val transform1 = window1.javaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+ }
+
+ @Test
+ def testReduceEventTimeWindows(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(0)
+ .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+ .reduce(new DummyReducer())
+
+ val transform1 = window1.javaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+ val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
+
+ assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+ }
+
+ @Test
+ def testFoldEventTimeWindows(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(0)
+ .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+ .fold(("", "", 1), new DummyFolder())
+
+ val transform1 = window1.javaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+ val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
+
+ assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator1.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+ }
+
+ @Test
+ def testApplyEventTimeWindows(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(0)
+ .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+ .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow] {
+ override def apply(
+ key: Tuple,
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = ???
+ })
+
+ val transform1 = window1.javaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+ val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
+
+ assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+ }
+}