You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/11 11:15:47 UTC

[2/6] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests

http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 6273e54..c738955 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -19,247 +19,1017 @@
 package org.apache.flink.streaming.api.scala
 
 
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor}
-import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.functions.{FoldFunction, RichFoldFunction, RichReduceFunction}
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
 import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, EventTimeTrigger, ProcessingTimeTrigger, Trigger}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.streaming.runtime.operators.windowing._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.util.Collector
-
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
+
+/**
+  * These tests verify that the api calls on [[AllWindowedStream]] instantiate the correct
+  * window operator.
+  *
+  * We also create a test harness and push one element into the operator to verify
+  * that we get some output.
+  */
+class AllWindowTranslationTest {
+
+  /**
+    * .reduce() does not support [[RichReduceFunction]], since the reduce function is used
+    * internally in a [[org.apache.flink.api.common.state.ReducingState]].
+    */
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testReduceWithRichReducerFails() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce(new RichReduceFunction[(String, Int)] {
+        override def reduce(value1: (String, Int), value2: (String, Int)) = null
+      })
 
-class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+    fail("exception was not thrown")
+  }
 
   /**
-   * These tests ensure that the fast aligned time windows operator is used if the
-   * conditions are right.
-   *
-   * TODO: update once we have optimized aligned time windows operator for all-windows
-   */
-  @Ignore
+    * .fold() does not support [[RichFoldFunction]], since the reduce function is used internally
+    * in a [[org.apache.flink.api.common.state.FoldingState]].
+    */
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testFoldWithRichFolderFails() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] {
+        override def fold(accumulator: (String, Int), value: (String, Int)) = null
+      })
+
+    fail("exception was not thrown")
+  }
+
+  @Test
+  def testSessionWithFoldFails() {
+    // verify that fold does not work with merging windows
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val windowedStream = env.fromElements("Hello", "Ciao")
+      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
+
+    try
+      windowedStream.fold("", new FoldFunction[String, String]() {
+        @throws[Exception]
+        def fold(accumulator: String, value: String): String = accumulator
+      })
+
+    catch {
+      case _: UnsupportedOperationException =>
+        // expected
+        // use a catch to ensure that the exception is thrown by the fold
+        return
+    }
+
+    fail("The fold call should fail.")
+  }
+
+  @Test
+  def testMergingAssignerWithNonMergingTriggerFails() {
+    // verify that we check for trigger compatibility
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val windowedStream = env.fromElements("Hello", "Ciao")
+      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
+
+    try
+      windowedStream.trigger(new Trigger[String, TimeWindow]() {
+        def onElement(
+                       element: String,
+                       timestamp: Long,
+                       window: TimeWindow,
+                       ctx: Trigger.TriggerContext) = null
+
+        def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null
+
+        def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null
+
+        override def canMerge = false
+
+        def clear(window: TimeWindow, ctx: Trigger.TriggerContext) {}
+      })
+
+    catch {
+      case _: UnsupportedOperationException =>
+        // expected
+        // use a catch to ensure that the exception is thrown by the fold
+        return
+    }
+
+    fail("The trigger call should fail.")
+  }
+
+  @Test
+  def testReduceEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
   @Test
-  def testFastTimeWindows(): Unit = {
+  def testReduceProcessingTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .windowAll(SlidingEventTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduce(reducer)
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce( (x, _) => x )
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    val window2 = source
-      .keyBy(0)
-      .windowAll(SlidingEventTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
-        def apply(
-            window: TimeWindow,
-            values: Iterable[(String, Int)],
-            out: Collector[(String, Int)]) { }
-      })
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val transform2 = window2.javaStream.getTransformation
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
   @Test
-  def testNonEvicting(): Unit = {
+  def testReduceWithWindowFunctionProcessingTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithPreReducerEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .windowAll(SlidingEventTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer)
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .apply(
+        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
-    assertTrue(
-      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
-    val window2 = source
-      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
-      def apply(
-                    window: TimeWindow,
-                    values: Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
 
-    val transform2 = window2.javaStream.getTransformation
+  @Test
+  def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        { (x, _) => x },
+        { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
+
   @Test
-  def testEvicting(): Unit = {
+  def testFoldEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", "", 1), new DummyFolder)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduce(reducer)
+      .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", "", 1), new DummyFolder)
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
-    assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
-    assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val window2 = source
-      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .evictor(CountEvictor.of(1000))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
-      def apply(
-                    window: TimeWindow,
-                    values: Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", "", 1)) { (acc, _) => acc }
 
-    val transform2 = window2.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
+
   @Test
-  def testPreReduce(): Unit = {
+  def testFoldWithWindowFunctionEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .keyBy(0)
-      .window(SlidingEventTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+        })
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
-    assertTrue(
-      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
-
-
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithPreFolderEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        ("", "", 1),
+        new DummyFolder,
+        new AllWindowFunction[(String, String, Int), (String, String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, String, Int)]): Unit =
+            input foreach {x => out.collect((x._1, x._2, x._3))}
+        })
 
-    val transform2 = window2.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(
-      winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
-}
+  @Test
+  def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        { (acc: (String, String, Int), _) => acc },
+        { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+          in foreach { x => out.collect((x._1, x._3)) }
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
+  def testApplyEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyProcessingTimeTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply { (window, in, out: Collector[(String, Int)]) =>
+        in foreach { x => out.collect(x)}
+      }
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
+  def testReduceWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .fold(("", "", 1), new DummyFolder)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .apply(
+        new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .fold(("", "", 1), new DummyFolder)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    winOperator.setOutputType(
+      window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]],
+      new ExecutionConfig)
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .apply(
+        new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def apply(
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  /**
+    * Ensure that we get some output from the given operator when pushing in an element and
+    * setting watermark and processing time to `Long.MaxValue`.
+    */
+  @throws[Exception]
+  private def processElementAndEnsureOutput[K, IN, OUT](
+                                                         operator: OneInputStreamOperator[IN, OUT],
+                                                         keySelector: KeySelector[IN, K],
+                                                         keyType: TypeInformation[K],
+                                                         element: IN) {
+    val testHarness =
+      new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(0)
+    testHarness.processWatermark(Long.MinValue)
+
+    testHarness.processElement(new StreamRecord[IN](element, 0))
+
+    // provoke any processing-time/event-time triggers
+    testHarness.setProcessingTime(Long.MaxValue)
+    testHarness.processWatermark(Long.MaxValue)
 
-// ------------------------------------------------------------------------
-//  UDFs
-// ------------------------------------------------------------------------
+    // we at least get the two watermarks and should also see an output element
+    assertTrue(testHarness.getOutput.size >= 3)
 
-class DummyReducer extends ReduceFunction[(String, Int)] {
-  def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
-    value1
+    testHarness.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
new file mode 100644
index 0000000..ff97656
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, WindowOperator}
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.util.Collector
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+
+/**
+  * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut
+  * instantiate the correct window operator.
+  */
+class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+  /**
+    * These tests ensure that the fast aligned time windows operator is used if the
+    * conditions are right.
+    */
+  @Test
+  @Ignore
+  def testReduceFastTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+    
+    val window1 = source
+      .keyBy(0)
+      .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+      .reduce(new DummyReducer())
+
+    val transform1 = window1.javaStream.getTransformation
+        .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+    
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+  }
+
+  /**
+    * These tests ensure that the fast aligned time windows operator is used if the
+    * conditions are right.
+    */
+  @Test
+  @Ignore
+  def testApplyFastTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(0)
+      .timeWindow(Time.minutes(1))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   key: Tuple,
+                   window: TimeWindow,
+                   values: Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.javaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+  }
+
+  @Test
+  def testReduceEventTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(0)
+      .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+      .reduce(new DummyReducer())
+
+    val transform1 = window1.javaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
+
+    assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+  }
+
+  @Test
+  def testFoldEventTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(0)
+      .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+      .fold(("", "", 1), new DummyFolder())
+
+    val transform1 = window1.javaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
+
+    assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator1.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+  }
+
+  @Test
+  def testApplyEventTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(0)
+      .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow] {
+        override def apply(
+            key: Tuple,
+            window: TimeWindow,
+            input: Iterable[(String, Int)],
+            out: Collector[(String, Int)]): Unit = ???
+      })
+
+    val transform1 = window1.javaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
+
+    assertTrue(winOperator1.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+  }
+}