You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:11 UTC

[1/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests

Repository: flink
Updated Branches:
  refs/heads/master 09fe4b0b8 -> 5368a7d32


http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index bd3fe3d..600645b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.state.{AggregatingStateDescriptor, FoldingSta
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.operators.{OneInputStreamOperator, OutputTypeConfigurable}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -367,6 +367,90 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testReduceWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer,
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer,
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyWithPreReducerEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -408,6 +492,50 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testApplyWithPreReducerAndEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .evictor(CountEvictor.of(100))
+      .apply(
+        new DummyReducer,
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
   def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -584,6 +712,74 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testAggregateWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .aggregate(new DummyAggregator(), new TestProcessWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .aggregate(new DummyAggregator(), new TestProcessWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -814,7 +1010,7 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testApplyWithPreFolderEventTime() {
+  def testFoldWithProcessWindowFunctionEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
@@ -823,16 +1019,15 @@ class WindowTranslationTest {
     val window1 = source
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-      .apply(
+      .fold(
         ("", "", 1),
         new DummyFolder,
-        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] {
-          override def apply(
+        new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+          override def process(
               key: String,
-              window: TimeWindow,
+              window: Context,
               input: Iterable[(String, String, Int)],
-              out: Collector[(String, String, Int)]): Unit =
-            input foreach {x => out.collect((x._1, x._2, x._3))}
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
         })
 
     val transform = window1
@@ -858,20 +1053,24 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
+  def testFoldWithProcessWindowFunctionProcessingTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
       .keyBy(_._1)
-      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
       .fold(
         ("", "", 1),
-        { (acc: (String, String, Int), _) => acc },
-        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
-          in foreach { x => out.collect((x._1, x._3)) }
+        new DummyFolder,
+        new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
         })
 
     val transform = window1
@@ -885,8 +1084,8 @@ class WindowTranslationTest {
     val winOperator = operator
       .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
-    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
     assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
@@ -896,12 +1095,8 @@ class WindowTranslationTest {
       ("hello", 1))
   }
 
-  // --------------------------------------------------------------------------
-  //  apply() tests
-  // --------------------------------------------------------------------------
-
   @Test
-  def testApplyEventTime() {
+  def testApplyWithPreFolderEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
@@ -911,12 +1106,15 @@ class WindowTranslationTest {
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
       .apply(
-        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] {
           override def apply(
               key: String,
               window: TimeWindow,
-              input: Iterable[(String, Int)],
-              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, String, Int)]): Unit =
+            input foreach {x => out.collect((x._1, x._2, x._3))}
         })
 
     val transform = window1
@@ -932,7 +1130,7 @@ class WindowTranslationTest {
 
     assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
     assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
       winOperator,
@@ -942,22 +1140,26 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testApplyProcessingTimeTime() {
+  def testApplyWithPreFolderAndEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
       .keyBy(_._1)
-      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
       .apply(
-        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] {
           override def apply(
-              key: String,
-              window: TimeWindow,
-              input: Iterable[(String, Int)],
-              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+                              key: String,
+                              window: TimeWindow,
+                              input: Iterable[(String, String, Int)],
+                              out: Collector[(String, String, Int)]): Unit =
+            input foreach {x => out.collect((x._1, x._2, x._3))}
         })
 
     val transform = window1
@@ -971,8 +1173,8 @@ class WindowTranslationTest {
     val winOperator = operator
       .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
-    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
@@ -983,7 +1185,7 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testApplyEventTimeWithScalaFunction() {
+  def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
@@ -992,9 +1194,12 @@ class WindowTranslationTest {
     val window1 = source
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-      .apply { (key, window, in, out: Collector[(String, Int)]) =>
-        in foreach { x => out.collect(x)}
-      }
+      .fold(
+        ("", "", 1),
+        { (acc: (String, String, Int), _) => acc },
+        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+          in foreach { x => out.collect((x._1, x._3)) }
+        })
 
     val transform = window1
       .javaStream
@@ -1009,7 +1214,211 @@ class WindowTranslationTest {
 
     assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
     assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  // --------------------------------------------------------------------------
+  //  apply() tests
+  // --------------------------------------------------------------------------
+
+  @Test
+  def testApplyEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testProcessEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testProcessProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply { (key, window, in, out: Collector[(String, Int)]) =>
+        in foreach { x => out.collect(x)}
+      }
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
       winOperator,
@@ -1132,6 +1541,49 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testProcessWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
   def testReduceWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1169,6 +1621,113 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testReduceWithEvictorAndProcessFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .reduce(new DummyReducer, new TestProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .aggregate(new DummyAggregator())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithEvictorAndProcessFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .aggregate(new DummyAggregator(), new TestProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testFoldWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1210,6 +1769,48 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testFoldWithEvictorAndProcessFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .fold(("", "", 1), new DummyFolder, new TestFoldProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    winOperator.setOutputType(
+      window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]],
+      new ExecutionConfig)
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
   def testApplyWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1252,6 +1853,48 @@ class WindowTranslationTest {
       ("hello", 1))
   }
 
+  @Test
+  def testProcessWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
 
   /**
     * Ensure that we get some output from the given operator when pushing in an element and
@@ -1266,6 +1909,12 @@ class WindowTranslationTest {
     val testHarness =
       new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
 
+    if (operator.isInstanceOf[OutputTypeConfigurable[String]]) {
+      // use a dummy type since window functions just need the ExecutionConfig
+      // this is also only needed for Fold, which we're getting rid off soon.
+      operator.asInstanceOf[OutputTypeConfigurable[String]]
+        .setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig)
+    }
     testHarness.open()
 
     testHarness.setProcessingTime(0)
@@ -1319,16 +1968,45 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String,
   override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
 }
 
-class TestWindowFunction extends WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+class TestWindowFunction
+  extends WindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
 
   override def apply(
       key: String,
       window: TimeWindow,
       input: Iterable[(String, Int)],
-      out: Collector[(String, Int)]): Unit = {
+      out: Collector[(String, String, Int)]): Unit = {
+
+    input.foreach(e => out.collect((e._1, e._1, e._2)))
+  }
+}
+
+class TestProcessWindowFunction
+  extends ProcessWindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
+
+  override def process(
+      key: String,
+      window: Context,
+      input: Iterable[(String, Int)],
+      out: Collector[(String, String, Int)]): Unit = {
 
-    input.foreach(out.collect)
+    input.foreach(e => out.collect((e._1, e._1, e._2)))
   }
 }
 
+class TestFoldProcessWindowFunction
+  extends ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+
+  override def process(
+                        key: String,
+                        window: Context,
+                        input: Iterable[(String, String, Int)],
+                        out: Collector[(String, Int)]): Unit = {
+
+    input.foreach(e => out.collect((e._1, e._3)))
+  }
+}
+
+
+
 


[4/8] flink git commit: [hotfix] Fix trailing whitespace in WindowedStream.java

Posted by al...@apache.org.
[hotfix] Fix trailing whitespace in WindowedStream.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82db667d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82db667d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82db667d

Branch: refs/heads/master
Commit: 82db667d319778f30244e9c4212bf3a2920f604f
Parents: 09fe4b0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Feb 7 10:54:54 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 34 ++++++++++----------
 1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82db667d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 3fbdda8..04da04d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -89,7 +89,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Note that the {@code WindowedStream} is purely and API construct, during runtime
  * the {@code WindowedStream} will be collapsed together with the
  * {@code KeyedStream} and the operation over the window into one single operation.
- * 
+ *
  * @param <T> The type of elements in the stream.
  * @param <K> The type of the key by which elements are grouped.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
@@ -190,9 +190,9 @@ public class WindowedStream<T, K, W extends Window> {
 	 * so a few elements are stored per key (one per slide interval).
 	 * Custom windows may not be able to incrementally aggregate, or may need to store extra values
 	 * in an aggregation tree.
-	 * 
+	 *
 	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 * @return The data stream that is the result of applying the reduce function to the window.
 	 */
 	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
@@ -521,10 +521,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param function The aggregation function.
 	 * @return The data stream that is the result of applying the fold function to the window.
-	 * 
+	 *
 	 * @param <ACC> The type of the AggregateFunction's accumulator
 	 * @param <R> The type of the elements in the resulting stream, equal to the
-	 *            AggregateFunction's result type   
+	 *            AggregateFunction's result type
 	 */
 	public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
 		checkNotNull(function, "function");
@@ -549,10 +549,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param function The aggregation function.
 	 * @return The data stream that is the result of applying the aggregation function to the window.
-	 * 
+	 *
 	 * @param <ACC> The type of the AggregateFunction's accumulator
 	 * @param <R> The type of the elements in the resulting stream, equal to the
-	 *            AggregateFunction's result type  
+	 *            AggregateFunction's result type
 	 */
 	public <ACC, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, R> function,
@@ -581,11 +581,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param aggFunction The aggregate function that is used for incremental aggregation.
 	 * @param windowFunction The window function.
-	 * 
+	 *
 	 * @return The data stream that is the result of applying the window function to the window.
-	 * 
+	 *
 	 * @param <ACC> The type of the AggregateFunction's accumulator
-	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input  
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            WindowFunction's result type
 	 */
@@ -620,17 +620,17 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param windowFunction The window function.
 	 * @param accumulatorType Type information for the internal accumulator type of the aggregation function
 	 * @param resultType Type information for the result type of the window function
-	 *    
+	 *
 	 * @return The data stream that is the result of applying the window function to the window.
-	 * 
+	 *
 	 * @param <ACC> The type of the AggregateFunction's accumulator
-	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input  
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
 	 * @param <R> The type of the elements in the resulting stream, equal to the
 	 *            WindowFunction's result type
 	 */
 	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
 			AggregateFunction<T, ACC, V> aggregateFunction,
-			WindowFunction<V, R, K, W> windowFunction, 
+			WindowFunction<V, R, K, W> windowFunction,
 			TypeInformation<ACC> accumulatorType,
 			TypeInformation<V> aggregateResultType,
 			TypeInformation<R> resultType) {
@@ -699,7 +699,7 @@ public class WindowedStream<T, K, W extends Window> {
 	// ------------------------------------------------------------------------
 	//  Window Function (apply)
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Applies the given window function to each window. The window function is called for each
 	 * evaluation of the window for each key individually. The output of the window function is
@@ -708,7 +708,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * <p>
 	 * Not that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
-	 * 
+	 *
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
@@ -1229,7 +1229,7 @@ public class WindowedStream<T, K, W extends Window> {
 				@SuppressWarnings("unchecked")
 				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
 						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), 
+								reducer, input.getKeySelector(),
 								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
 								windowLength, windowSlide);


[6/8] flink git commit: [FLINK-4997] [streaming] Introduce ProcessWindowFunction

Posted by al...@apache.org.
[FLINK-4997] [streaming] Introduce ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dcb2dcd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dcb2dcd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dcb2dcd

Branch: refs/heads/master
Commit: 1dcb2dcd8969941988a4fc7e5488e9272dfd507e
Parents: 82db667
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 23 18:00:23 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 381 ++++++++++++++---
 .../FoldApplyProcessWindowFunction.java         | 120 ++++++
 .../windowing/ProcessWindowFunction.java        |  61 +++
 .../ReduceApplyProcessWindowFunction.java       |  80 ++++
 .../windowing/RichProcessWindowFunction.java    |  85 ++++
 .../windowing/AccumulatingKeyedTimePanes.java   |  12 +-
 ...ccumulatingProcessingTimeWindowOperator.java |  16 +-
 .../InternalIterableProcessWindowFunction.java  |  63 +++
 ...nternalSingleValueProcessWindowFunction.java |  66 +++
 .../FoldApplyProcessWindowFunctionTest.java     | 155 +++++++
 .../operators/FoldApplyWindowFunctionTest.java  |  28 +-
 .../functions/InternalWindowFunctionTest.java   | 101 ++++-
 ...AlignedProcessingTimeWindowOperatorTest.java | 419 ++++++++++++++++++-
 .../operators/windowing/WindowOperatorTest.java | 177 ++++++++
 .../streaming/runtime/WindowFoldITCase.java     |  78 ++++
 15 files changed, 1738 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 04da04d..45eaae5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AggregateFunction;
@@ -39,8 +40,11 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -60,9 +64,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -360,6 +367,98 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, ProcessWindowFunction.class, true, true, input.getType(), null, false);
+
+		return reduce(reduceFunction, function, resultType);
+	}
+
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+		if (reduceFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
+		}
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator =
+					new EvictingWindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
+							trigger,
+							evictor,
+							allowedLateness);
+
+		} else {
+			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+					reduceFunction,
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator =
+					new WindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalSingleValueProcessWindowFunction<>(function),
+							trigger,
+							allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Fold Function
 	// ------------------------------------------------------------------------
@@ -510,6 +609,117 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue The initial value of the fold.
+	 * @param foldFunction The fold function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
+		}
+
+		TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+				Utils.getCallLocationName(), true);
+
+		TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType(
+				windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false);
+
+		return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
+	 *
+	 * @param initialValue the initial value to be passed to the first invocation of the fold function
+	 * @param foldFunction The fold function.
+	 * @param foldResultType The result type of the fold function.
+	 * @param windowFunction The process window function.
+	 * @param windowResultType The process window function result type.
+	 * @return The data stream that is the result of applying the fold function to the window.
+	 */
+	@Internal
+	public <R, ACC> SingleOutputStreamOperator<R> fold(
+			ACC initialValue,
+			FoldFunction<T, ACC> foldFunction,
+			ProcessWindowFunction<ACC, R, K, W> windowFunction,
+			TypeInformation<ACC> foldResultType,
+			TypeInformation<R> windowResultType) {
+		if (foldFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
+		}
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
+		}
+
+		//clean the closures
+		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+		foldFunction = input.getExecutionEnvironment().clean(foldFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator =
+					new EvictingWindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)),
+							trigger,
+							evictor,
+							allowedLateness);
+
+		} else {
+			FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
+					initialValue,
+					foldFunction,
+					foldResultType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator =
+					new WindowOperator<>(windowAssigner,
+							windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+							keySel,
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							stateDesc,
+							new InternalSingleValueProcessWindowFunction<>(windowFunction),
+							trigger,
+							allowedLateness);
+		}
+
+		return input.transform(opName, windowResultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregation Function
 	// ------------------------------------------------------------------------
@@ -733,11 +943,53 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
-
-		//clean the closure
+		String callLocation = Utils.getCallLocationName();
 		function = input.getExecutionEnvironment().clean(function);
+		return apply(new InternalIterableWindowFunction<>(function), resultType, callLocation);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+			function, ProcessWindowFunction.class, true, true, getInputType(), null, false);
+
+		return process(function, resultType);
+	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of incremental aggregation.
+	 *
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
 		String callLocation = Utils.getCallLocationName();
+		function = input.getExecutionEnvironment().clean(function);
+		return apply(new InternalIterableProcessWindowFunction<>(function), resultType, callLocation);
+	}
+
+	private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, String callLocation) {
+
 		String udfName = "WindowedStream." + callLocation;
 
 		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
@@ -767,7 +1019,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					new InternalIterableWindowFunction<>(function),
+					function,
 					trigger,
 					evictor,
 					allowedLateness);
@@ -784,7 +1036,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					new InternalIterableWindowFunction<>(function),
+					function,
 					trigger,
 					allowedLateness,
 					legacyWindowOpType);
@@ -1211,7 +1463,7 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			Function function,
+			ReduceFunction<?> function,
 			TypeInformation<R> resultType,
 			String functionName) {
 
@@ -1222,30 +1474,18 @@ public class WindowedStream<T, K, W extends Window> {
 
 			String opName = "Fast " + timeWindows + " of " + functionName;
 
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+			@SuppressWarnings("unchecked")
+			OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+					new AggregatingProcessingTimeWindowOperator<>(
+							reducer, input.getKeySelector(),
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+							windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
+
 		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
 			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
@@ -1253,36 +1493,69 @@ public class WindowedStream<T, K, W extends Window> {
 
 			String opName = "Fast " + timeWindows + " of " + functionName;
 
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer,
-								input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+			@SuppressWarnings("unchecked")
+			OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+					new AggregatingProcessingTimeWindowOperator<>(
+							reducer,
+							input.getKeySelector(),
+							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+							input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+							windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
+		}
+
+		return null;
+	}
+
+	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
+			InternalWindowFunction<Iterable<T>, R, K, W> function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
+			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSlide();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			@SuppressWarnings("unchecked")
+			InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
+					(InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+
+			OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+					timeWindowFunction, input.getKeySelector(),
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+					windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
+		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
+			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSize();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			@SuppressWarnings("unchecked")
+			InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
+					(InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+
+
+			OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+					timeWindowFunction, input.getKeySelector(),
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+					windowLength, windowSlide);
+			return input.transform(opName, resultType, op);
 		}
 
 		return null;
 	}
 
+
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
new file mode 100644
index 0000000..e1bc759
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
+	extends RichProcessWindowFunction<T, R, K, W>
+	implements OutputTypeConfigurable<R> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final FoldFunction<T, ACC> foldFunction;
+	private final ProcessWindowFunction<ACC, R, K, W> windowFunction;
+
+	private byte[] serializedInitialValue;
+	private TypeSerializer<ACC> accSerializer;
+	private final TypeInformation<ACC> accTypeInformation;
+	private transient ACC initialValue;
+
+	public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
+		this.windowFunction = windowFunction;
+		this.foldFunction = foldFunction;
+		this.initialValue = initialValue;
+		this.accTypeInformation = accTypeInformation;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		FunctionUtils.openFunction(this.windowFunction, configuration);
+
+		if (serializedInitialValue == null) {
+			throw new RuntimeException("No initial value was serialized for the fold " +
+				"window function. Probably the setOutputType method was not called.");
+		}
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+		initialValue = accSerializer.deserialize(in);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.windowFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+	}
+
+	@Override
+	public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+		ACC result = accSerializer.copy(initialValue);
+
+		for (T val : values) {
+			result = foldFunction.fold(result, val);
+		}
+
+		windowFunction.process(key, windowFunction.new Context() {
+			@Override
+			public W window() {
+				return context.window();
+			}
+		}, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
+		accSerializer = accTypeInformation.createSerializer(executionConfig);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+
+		try {
+			accSerializer.serialize(initialValue, out);
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unable to serialize initial value of type " +
+				initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
+		}
+
+		serializedInitialValue = baos.toByteArray();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
new file mode 100644
index 0000000..9c48e24
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Evaluates the window and outputs none or several elements.
+	 *
+	 * @param key The key for which this window is evaluated.
+	 * @param context The context in which the window is being evaluated.
+	 * @param elements The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
+
+	/**
+	 * The context holding window metadata
+	 */
+	public abstract class Context {
+		/**
+		 * @return The window that is being evaluated.
+		 */
+		public abstract W window();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
new file mode 100644
index 0000000..9ea1fdf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
+	extends RichProcessWindowFunction<T, R, K, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+	private final ProcessWindowFunction<T, R, K, W> windowFunction;
+
+	public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
+		this.windowFunction = windowFunction;
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void process(K k, final Context context, Iterable<T> input, Collector<R> out) throws Exception {
+
+		T curr = null;
+		for (T val: input) {
+			if (curr == null) {
+				curr = val;
+			} else {
+				curr = reduceFunction.reduce(curr, val);
+			}
+		}
+		windowFunction.process(k, windowFunction.new Context() {
+			@Override
+			public W window() {
+				return context.window();
+			}
+		}, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		FunctionUtils.openFunction(this.windowFunction, configuration);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.windowFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
new file mode 100644
index 0000000..ac55bc6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
+		extends ProcessWindowFunction<IN, OUT, KEY, W>
+		implements RichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Runtime context access
+	// --------------------------------------------------------------------------------------------
+
+	private transient RuntimeContext runtimeContext;
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		this.runtimeContext = t;
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		if (this.runtimeContext != null) {
+			return this.runtimeContext;
+		} else {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		}
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		if (this.runtimeContext == null) {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
+			return (IterationRuntimeContext) this.runtimeContext;
+		} else {
+			throw new IllegalStateException("This stub is not part of an iteration step function.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Default life cycle methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {}
+
+	@Override
+	public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index a252ece..87c5aca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
 
-	private final WindowFunction<Type, Result, Key, Window> function;
+	private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
 
 	/**
 	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
@@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	// ------------------------------------------------------------------------
 	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
+	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
 	}
@@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	}
 
 	@Override
-	public void evaluateWindow(Collector<Result> out, TimeWindow window, 
+	public void evaluateWindow(Collector<Result> out, final TimeWindow window,
 								AbstractStreamOperator<Result> operator) throws Exception
 	{
 		if (previousPanes.isEmpty()) {
@@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	
 	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-		private final WindowFunction<Type, Result, Key, Window> function;
+		private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
 		
 		private final UnionIterator<Type> unionIterator;
 		
@@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		private Key currentKey;
 		
 
-		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window,
+		WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
 								Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
 			this.function = function;
 			this.out = out;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 7adaf13..094b34d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -23,22 +23,22 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 
 import java.util.ArrayList;
 
 @Internal
 @Deprecated
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
-
+	
 	public AccumulatingProcessingTimeWindowOperator(
-			WindowFunction<IN, OUT, KEY, TimeWindow> function,
+			InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			TypeSerializer<IN> valueSerializer,
@@ -46,14 +46,14 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 			long windowSlide)
 	{
 		super(function, keySelector, keySerializer,
-				new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
+				new ArrayListSerializer<>(valueSerializer), windowLength, windowSlide);
 	}
 
 	@Override
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
-		WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-
+		InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
+		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
new file mode 100644
index 0000000..de516a5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends Window>
+		extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+		implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+		super(wrappedFunction);
+	}
+
+	@Override
+	public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+		
+		wrappedFunction.process(key, context, input, out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
new file mode 100644
index 0000000..b28c208
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window>
+		extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+		implements InternalWindowFunction<IN, OUT, KEY, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+		super(wrappedFunction);
+	}
+
+	@Override
+	public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		wrappedFunction.process(key, context, Collections.singletonList(input), out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
new file mode 100644
index 0000000..af5c77a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FoldApplyProcessWindowFunctionTest {
+
+	/**
+	 * Tests that the FoldWindowFunction gets the output type serializer set by the
+	 * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result.
+	 */
+	@Test
+	public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{
+		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+
+		List<StreamTransformation<?>> transformations = new ArrayList<>();
+
+		int initValue = 1;
+
+		FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>(
+			initValue,
+			new FoldFunction<Integer, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Integer value) throws Exception {
+					return accumulator + value;
+				}
+
+			},
+			new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+				@Override
+				public void process(Integer integer,
+									Context context,
+									Iterable<Integer> input,
+									Collector<Integer> out) throws Exception {
+					for (Integer in: input) {
+						out.collect(in);
+					}
+				}
+			},
+			BasicTypeInfo.INT_TYPE_INFO
+		);
+
+		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+			new InternalIterableProcessWindowFunction<>(foldWindowFunction),
+			new KeySelector<Integer, Integer>() {
+				private static final long serialVersionUID = -7951310554369722809L;
+
+				@Override
+				public Integer getKey(Integer value) throws Exception {
+					return value;
+				}
+			},
+			IntSerializer.INSTANCE,
+			IntSerializer.INSTANCE,
+			3000,
+			3000
+		);
+
+		SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
+
+			private static final long serialVersionUID = 8297735565464653028L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		};
+
+		SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1);
+
+		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+
+		List<Integer> result = new ArrayList<>();
+		List<Integer> input = new ArrayList<>();
+		List<Integer> expected = new ArrayList<>();
+
+		input.add(1);
+		input.add(2);
+		input.add(3);
+
+		for (int value : input) {
+			initValue += value;
+		}
+
+		expected.add(initValue);
+
+		foldWindowFunction.process(0, foldWindowFunction.new Context() {
+			@Override
+			public TimeWindow window() {
+				return new TimeWindow(0, 1);
+			}
+		}, input, new ListCollector<>(result));
+
+		Assert.assertEquals(expected, result);
+	}
+
+	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+		@Override
+		public JobExecutionResult execute(String jobName) throws Exception {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 91ec427..fecd440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 import org.junit.Assert;
@@ -81,19 +82,20 @@ public class FoldApplyWindowFunctionTest {
 		);
 
 		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-			foldWindowFunction,
-			new KeySelector<Integer, Integer>() {
-				private static final long serialVersionUID = -7951310554369722809L;
-
-				@Override
-				public Integer getKey(Integer value) throws Exception {
-					return value;
-				}
-			},
-			IntSerializer.INSTANCE,
-			IntSerializer.INSTANCE,
-			3000,
-			3000
+			new InternalIterableWindowFunction<>(
+					foldWindowFunction),
+				new KeySelector<Integer, Integer>() {
+					private static final long serialVersionUID = -7951310554369722809L;
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				},
+				IntSerializer.INSTANCE,
+				IntSerializer.INSTANCE,
+				3000,
+				3000
 		);
 
 		SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index f3c3423..3c73035 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -115,7 +118,48 @@ public class InternalWindowFunctionTest {
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
 		windowFunction.apply(42L, w, i, c);
-		verify(mock).apply(42L, w, i, c);
+		verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalIterableProcessWindowFunction() throws Exception {
+
+		ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
+		InternalIterableProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+				new InternalIterableProcessWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(42L, w, i, c);
+		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
 		windowFunction.close();
@@ -204,6 +248,59 @@ public class InternalWindowFunctionTest {
 		verify(mock).close();
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalSingleValueProcessWindowFunction() throws Exception {
+
+		ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
+		InternalSingleValueProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+				new InternalSingleValueProcessWindowFunction<>(mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		windowFunction.apply(42L, w, 23L, c);
+		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
+	public static class ProcessWindowFunctionMock
+		extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
+		implements OutputTypeConfigurable<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+	}
+
 	public static class WindowFunctionMock
 		extends RichWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
@@ -214,7 +311,7 @@ public class InternalWindowFunctionTest {
 		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
 
 		@Override
-		public void apply(Long aLong, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception { }
+		public void apply(Long aLong, TimeWindow w, Iterable<Long> input, Collector<String> out) throws Exception { }
 	}
 
 	public static class AllWindowFunctionMock

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 255a20f..508d2e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,8 +36,12 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +51,9 @@ import org.apache.flink.util.Collector;
 
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,10 +71,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings({"serial"})
+@PrepareForTest(InternalIterableWindowFunction.class)
+@RunWith(PowerMockRunner.class)
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
-	private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
+	private final InternalIterableWindowFunction<String, String, String, TimeWindow> mockFunction = mock(InternalIterableWindowFunction.class);
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -79,26 +88,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	};
 	
-	private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
-			new WindowFunction<Integer, Integer, Integer, TimeWindow>()
-	{
-		@Override
-		public void apply(Integer key,
-				TimeWindow window,
-				Iterable<Integer> values,
-				Collector<Integer> out) {
-			for (Integer val : values) {
-				assertEquals(key, val);
-				out.collect(val);
-			}
-		}
-	};
+	private final InternalIterableWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+			new InternalIterableWindowFunction<>(new WindowFunction<Integer, Integer, Integer, TimeWindow>() {
+				@Override
+				public void apply(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+					for (Integer val : values) {
+						assertEquals(key, val);
+						out.collect(val);
+					}
+				}
+			});
+
+	private final InternalIterableProcessWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityProcessFunction =
+			new InternalIterableProcessWindowFunction<>(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+				@Override
+				public void process(Integer key, Context context, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+					for (Integer val : values) {
+						assertEquals(key, val);
+						out.collect(val);
+					}
+				}
+			});
 
 	// ------------------------------------------------------------------------
 
 	public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
 		ClosureCleaner.clean(identitySelector, false);
 		ClosureCleaner.clean(validatingIdentityFunction, false);
+		ClosureCleaner.clean(validatingIdentityProcessFunction, false);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -281,6 +298,50 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void testTumblingWindowWithProcessFunction() throws Exception {
+		try {
+			final int windowSize = 50;
+
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					windowSize, windowSize);
+
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
+
+			final int numElements = 1000;
+
+			long currentTime = 0;
+
+			for (int i = 0; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+				currentTime = currentTime + 10;
+				testHarness.setProcessingTime(currentTime);
+			}
+
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+			assertEquals(numElements, result.size());
+
+			Collections.sort(result);
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(i, result.get(i).intValue());
+			}
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testSlidingWindow() throws Exception {
 
 		// tumbling window that triggers every 20 milliseconds
@@ -333,6 +394,58 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void testSlidingWindowWithProcessFunction() throws Exception {
+
+		// tumbling window that triggers every 20 milliseconds
+		AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			new AccumulatingProcessingTimeWindowOperator<>(
+				validatingIdentityProcessFunction, identitySelector,
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		final int numElements = 1000;
+
+		long currentTime = 0;
+
+		for (int i = 0; i < numElements; i++) {
+			testHarness.processElement(new StreamRecord<>(i));
+			currentTime = currentTime + 10;
+			testHarness.setProcessingTime(currentTime);
+		}
+
+		// get and verify the result
+		List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+		// if we kept this running, each element would be in the result three times (for each slide).
+		// we are closing the window before the final panes are through three times, so we may have less
+		// elements.
+		if (result.size() < numElements || result.size() > 3 * numElements) {
+			fail("Wrong number of results: " + result.size());
+		}
+
+		Collections.sort(result);
+		int lastNum = -1;
+		int lastCount = -1;
+
+		for (int num : result) {
+			if (num == lastNum) {
+				lastCount++;
+				assertTrue(lastCount <= 3);
+			}
+			else {
+				lastNum = num;
+				lastCount = 1;
+			}
+		}
+
+		testHarness.close();
+	}
+
+	@Test
 	public void testTumblingWindowSingleElements() throws Exception {
 
 		try {
@@ -379,7 +492,55 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	@Test
+	public void testTumblingWindowSingleElementsWithProcessFunction() throws Exception {
+
+		try {
+
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.open();
+
+			testHarness.setProcessingTime(0);
+
+			testHarness.processElement(new StreamRecord<>(1));
+			testHarness.processElement(new StreamRecord<>(2));
+
+			testHarness.setProcessingTime(50);
+
+			testHarness.processElement(new StreamRecord<>(3));
+			testHarness.processElement(new StreamRecord<>(4));
+			testHarness.processElement(new StreamRecord<>(5));
+
+			testHarness.setProcessingTime(100);
+
+			testHarness.processElement(new StreamRecord<>(6));
+
+			testHarness.setProcessingTime(200);
+
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+			assertEquals(6, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	@Test
 	public void testSlidingWindowSingleElements() throws Exception {
 		try {
@@ -420,6 +581,126 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void testSlidingWindowSingleElementsWithProcessFunction() throws Exception {
+		try {
+
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+
+			KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+			testHarness.setProcessingTime(0);
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>(1));
+			testHarness.processElement(new StreamRecord<>(2));
+
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+
+			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+			assertEquals(6, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void checkpointRestoreWithPendingWindowTumblingWithProcessFunction() {
+		try {
+			final int windowSize = 200;
+
+			// tumbling window that triggers every 200 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					windowSize, windowSize);
+
+			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.setProcessingTime(0);
+
+			// inject some elements
+			final int numElementsFirst = 700;
+			final int numElements = 1000;
+			for (int i = 0; i < numElementsFirst; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			// draw a snapshot and dispose the window
+			int beforeSnapShot = testHarness.getOutput().size();
+			StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+			List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+			int afterSnapShot = testHarness.getOutput().size();
+			assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+			assertTrue(afterSnapShot <= numElementsFirst);
+
+			// inject some random elements, which should not show up in the state
+			for (int i = 0; i < 300; i++) {
+				testHarness.processElement(new StreamRecord<>(i + numElementsFirst));
+			}
+
+			testHarness.close();
+			op.dispose();
+
+			// re-create the operator and restore the state
+			op = new AccumulatingProcessingTimeWindowOperator<>(
+				validatingIdentityProcessFunction, identitySelector,
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+				windowSize, windowSize);
+
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setup();
+			testHarness.restore(state);
+			testHarness.open();
+
+			// inject some more elements
+			for (int i = numElementsFirst; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			testHarness.setProcessingTime(400);
+
+			// get and verify the result
+			List<Integer> finalResult = new ArrayList<>();
+			finalResult.addAll(resultAtSnapshot);
+			List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
+			finalResult.addAll(finalPartialResult);
+			assertEquals(numElements, finalResult.size());
+
+			Collections.sort(finalResult);
+			for (int i = 0; i < numElements; i++) {
+				assertEquals(i, finalResult.get(i).intValue());
+			}
+			testHarness.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void checkpointRestoreWithPendingWindowTumbling() {
 		try {
 			final int windowSize = 200;
@@ -501,6 +782,98 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
+	public void checkpointRestoreWithPendingWindowSlidingWithProcessFunction() {
+		try {
+			final int factor = 4;
+			final int windowSlide = 50;
+			final int windowSize = factor * windowSlide;
+
+			// sliding window (200 msecs) every 50 msecs
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+				new AccumulatingProcessingTimeWindowOperator<>(
+					validatingIdentityProcessFunction, identitySelector,
+					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					windowSize, windowSlide);
+
+			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setProcessingTime(0);
+
+			testHarness.setup();
+			testHarness.open();
+
+			// inject some elements
+			final int numElements = 1000;
+			final int numElementsFirst = 700;
+
+			for (int i = 0; i < numElementsFirst; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			// draw a snapshot
+			List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+			int beforeSnapShot = testHarness.getOutput().size();
+			StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+			int afterSnapShot = testHarness.getOutput().size();
+			assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+
+			assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
+
+			// inject the remaining elements - these should not influence the snapshot
+			for (int i = numElementsFirst; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			testHarness.close();
+
+			// re-create the operator and restore the state
+			op = new AccumulatingProcessingTimeWindowOperator<>(
+				validatingIdentityProcessFunction, identitySelector,
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+				windowSize, windowSlide);
+
+			testHarness = new OneInputStreamOperatorTestHarness<>(op);
+
+			testHarness.setup();
+			testHarness.restore(state);
+			testHarness.open();
+
+
+			// inject again the remaining elements
+			for (int i = numElementsFirst; i < numElements; i++) {
+				testHarness.processElement(new StreamRecord<>(i));
+			}
+
+			testHarness.setProcessingTime(50);
+			testHarness.setProcessingTime(100);
+			testHarness.setProcessingTime(150);
+			testHarness.setProcessingTime(200);
+			testHarness.setProcessingTime(250);
+			testHarness.setProcessingTime(300);
+			testHarness.setProcessingTime(350);
+
+			// get and verify the result
+			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+			List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
+			finalResult.addAll(finalPartialResult);
+			assertEquals(factor * numElements, finalResult.size());
+
+			Collections.sort(finalResult);
+			for (int i = 0; i < factor * numElements; i++) {
+				assertEquals(i / factor, finalResult.get(i).intValue());
+			}
+
+			testHarness.close();
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void checkpointRestoreWithPendingWindowSliding() {
 		try {
 			final int factor = 4;
@@ -601,8 +974,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
-							new StatefulFunction(), identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+							new InternalIterableProcessWindowFunction<>(new StatefulFunction()),
+							identitySelector,
+							IntSerializer.INSTANCE,
+							IntSerializer.INSTANCE,
+							50,
+							50);
 
 			OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 					new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -661,7 +1038,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+	private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		// we use a concurrent map here even though there is no concurrency, to
 		// get "volatile" style access to entries
@@ -677,8 +1054,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		@Override
-		public void apply(Integer key,
-						  TimeWindow window,
+		public void process(Integer key,
+						  Context context,
 						  Iterable<Integer> values,
 						  Collector<Integer> out) throws Exception {
 			for (Integer i : values) {


[5/8] flink git commit: [FLINK-4997] [streaming] Introduce ProcessWindowFunction

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 104bc7b..a9c3ef6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -59,7 +60,9 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -432,6 +435,78 @@ public class WindowOperatorTest extends TestLogger {
 
 	@Test
 	@SuppressWarnings("unchecked")
+	public void testSessionWindowsWithProcessFunction() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
+				EventTimeTrigger.create(),
+				0);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+
+		// do a snapshot, close and restore again
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+		testHarness.processWatermark(new Watermark(12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+		expectedOutput.add(new Watermark(12000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+		testHarness.processWatermark(new Watermark(17999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+		expectedOutput.add(new Watermark(17999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
 	public void testReduceSessionWindows() throws Exception {
 		closeCalled.set(0);
 
@@ -500,6 +575,76 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testReduceSessionWindowsWithProcessFunction() throws Exception {
+		closeCalled.set(0);
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
+				"window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
+				EventTimeTrigger.create(),
+				0);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+		// do a snapshot, close and restore again
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+		testHarness.processWatermark(new Watermark(12000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+		expectedOutput.add(new Watermark(12000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+		testHarness.processWatermark(new Watermark(17999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+		expectedOutput.add(new Watermark(17999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
 	/**
 	 * This tests whether merging works correctly with the CountTrigger.
 	 * @throws Exception
@@ -2379,6 +2524,38 @@ public class WindowOperatorTest extends TestLogger {
 		}
 	}
 
+	public static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void process(String key,
+				Context context,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			int sum = 0;
+			for (Tuple2<String, Integer> i: values) {
+				sum += i.f1;
+			}
+			String resultString = key + "-" + sum;
+			TimeWindow window = context.window();
+			out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
+		}
+	}
+
+	public static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void process(String key,
+				Context context,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			TimeWindow window = context.window();
+			for (Tuple2<String, Integer> val: values) {
+				out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd()));
+			}
+		}
+	}
 
 	public static class PointSessionWindows extends EventTimeSessionWindows {
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 1e3e3d5..7d37d1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -18,17 +18,22 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -114,6 +119,79 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testFoldProcessWindow() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("b", 5));
+
+				ctx.collect(Tuple2.of("a", 6));
+				ctx.collect(Tuple2.of("a", 7));
+				ctx.collect(Tuple2.of("a", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {}
+
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+		source1
+			.keyBy(0)
+			.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+			.fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+				@Override
+				public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
+					accumulator.f1 += value.f0;
+					accumulator.f0 += value.f1;
+					return accumulator;
+				}
+			}, new ProcessWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, Tuple, TimeWindow>() {
+				@Override
+				public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+					int i = 0;
+					for (Tuple2<Integer, String> in : elements) {
+						out.collect(new Tuple3<>(in.f1, in.f0, i++));
+					}
+				}
+			})
+			.addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() {
+				@Override
+				public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
+					testResults.add(value.toString());
+				}
+			});
+
+		env.execute("Fold Process Window Test");
+
+		List<String> expectedResult = Arrays.asList(
+			"(R:aaa,3,0)",
+			"(R:aaa,21,0)",
+			"(R:bbb,12,0)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
 	public void testFoldAllWindow() throws Exception {
 
 		testResults = new ArrayList<>();


[2/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests

Posted by al...@apache.org.
[FLINK-5237] Consolidate and harmonize Window Translation Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5368a7d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5368a7d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5368a7d3

Branch: refs/heads/master
Commit: 5368a7d32d96beb1b8298b87d9ea6d42ea306947
Parents: fe2a301
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../operators/StateDescriptorPassingTest.java   |  26 +
 .../windowing/WindowTranslationTest.java        | 718 +++++++++++++++--
 .../ScalaProcessWindowFunctionWrapper.scala     |  16 +-
 .../api/scala/WindowTranslationTest.scala       | 766 +++++++++++++++++--
 4 files changed, 1420 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 26cb7ac..813ca96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -136,6 +137,31 @@ public class StateDescriptorPassingTest {
 	}
 
 	@Test
+	public void testProcessWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<File, String>() {
+					@Override
+					public String getKey(File value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.process(new ProcessWindowFunction<File, String, String, TimeWindow>() {
+					@Override
+					public void process(String s, Context ctx,
+							Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateListStateDescriptorConfigured(result);
+	}
+
+	@Test
 	public void testFoldWindowAllState() throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index f72a2f1..b899948 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
@@ -112,7 +115,7 @@ public class WindowTranslationTest {
 	 * in a {@code AggregatingState}.
 	 */
 	@Test(expected = UnsupportedOperationException.class)
-	public void testAgrgegateWithRichFunctionFails() throws Exception {
+	public void testAggregateWithRichFunctionFails() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -405,6 +408,82 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithProcesWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(reducer, new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String tuple,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	/**
 	 * Test for the deprecated .apply(Reducer, WindowFunction).
 	 */
@@ -447,6 +526,50 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	/**
+	 * Test for the deprecated .apply(Reducer, WindowFunction).
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreReducerAndEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.evictor(CountEvictor.of(100))
+				.apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Aggregate Translation Tests
 	// ------------------------------------------------------------------------
@@ -463,13 +586,13 @@ public class WindowTranslationTest {
 				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = 
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
 				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -492,13 +615,13 @@ public class WindowTranslationTest {
 				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = 
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
 				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
@@ -529,7 +652,7 @@ public class WindowTranslationTest {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -569,6 +692,66 @@ public class WindowTranslationTest {
 				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	public void testAggregateWithProcessWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+		processElementAndEnsureOutput(
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+		processElementAndEnsureOutput(
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	// ------------------------------------------------------------------------
 	//  Fold Translation Tests
 	// ------------------------------------------------------------------------
@@ -664,83 +847,269 @@ public class WindowTranslationTest {
 	@SuppressWarnings("rawtypes")
 	public void testFoldWithWindowFunctionProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithProcessWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreFolderEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreFolderAndEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.evictor(CountEvictor.of(100))
+				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Apply Translation Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-		DataStream<Tuple2<String, Integer>> window = source
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
-				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public void apply(String key,
 							TimeWindow window,
-							Iterable<Tuple3<String, String, Integer>> values,
+							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
-						for (Tuple3<String, String, Integer> in : values) {
-							out.collect(new Tuple2<>(in.f0, in.f2));
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
 						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 		Assert.assertTrue(operator instanceof WindowOperator);
 		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
-		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
-		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
-		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testApplyWithPreFolderEventTime() throws Exception {
+	public void testApplyProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-		DataStream<Tuple3<String, String, Integer>> window = source
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
-				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public void apply(String key,
 							TimeWindow window,
-							Iterable<Tuple3<String, String, Integer>> values,
-							Collector<Tuple3<String, String, Integer>> out) throws Exception {
-						for (Tuple3<String, String, Integer> in : values) {
-							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
 						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 		Assert.assertTrue(operator instanceof WindowOperator);
 		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
-		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
-	}
 
-	// ------------------------------------------------------------------------
-	//  Apply Translation Tests
-	// ------------------------------------------------------------------------
+	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testApplyEventTime() throws Exception {
+	public void testProcessEventTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
@@ -749,12 +1118,12 @@ public class WindowTranslationTest {
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
 				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void apply(String key,
-							TimeWindow window,
+					public void process(String key,
+							Context ctx,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
 						for (Tuple2<String, Integer> in : values) {
@@ -776,7 +1145,7 @@ public class WindowTranslationTest {
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testApplyProcessingTimeTime() throws Exception {
+	public void testProcessProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -785,12 +1154,12 @@ public class WindowTranslationTest {
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
 				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void apply(String key,
-							TimeWindow window,
+					public void process(String key,
+							Context ctx,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
 						for (Tuple2<String, Integer> in : values) {
@@ -903,6 +1272,43 @@ public class WindowTranslationTest {
 
 	@Test
 	@SuppressWarnings("rawtypes")
+	public void testProcessWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
 	public void testReduceWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -930,6 +1336,121 @@ public class WindowTranslationTest {
 	}
 
 	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.reduce(
+						reducer,
+						new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+							@Override
+							public void process(
+									Tuple tuple,
+									Context context,
+									Iterable<Tuple2<String, Integer>> elements,
+									Collector<Tuple2<String, Integer>> out) throws Exception {
+								for (Tuple2<String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.aggregate(new DummyAggregationFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.aggregate(
+						new DummyAggregationFunction(),
+						new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+							@Override
+							public void process(
+									String s,
+									Context context,
+									Iterable<Tuple2<String, Integer>> elements,
+									Collector<Tuple2<String, Integer>> out) throws Exception {
+								for (Tuple2<String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	@Test
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	public void testFoldWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -958,6 +1479,48 @@ public class WindowTranslationTest {
 	}
 
 	@Test
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public void testFoldWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.fold(
+						new Tuple3<>("", "", 1),
+						new DummyFolder(),
+						new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, Tuple, TimeWindow>() {
+							@Override
+							public void process(
+									Tuple tuple,
+									Context context,
+									Iterable<Tuple3<String, String, Integer>> elements,
+									Collector<Tuple3<String, String, Integer>> out) throws Exception {
+								for (Tuple3<String, String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
 	@SuppressWarnings("rawtypes")
 	public void testApplyWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -996,6 +1559,45 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testProcessWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	/**
 	 * Ensure that we get some output from the given operator when pushing in an element and
 	 * setting watermark and processing time to {@code Long.MAX_VALUE}.
@@ -1012,6 +1614,12 @@ public class WindowTranslationTest {
 						keySelector,
 						keyType);
 
+		if (operator instanceof OutputTypeConfigurable) {
+			// use a dummy type since window functions just need the ExecutionConfig
+			// this is also only needed for Fold, which we're getting rid off soon.
+			((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+		}
+
 		testHarness.open();
 
 		testHarness.setProcessingTime(0);
@@ -1050,7 +1658,7 @@ public class WindowTranslationTest {
 		}
 	}
 
-	private static class DummyAggregationFunction 
+	private static class DummyAggregationFunction
 			implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
 		@Override
@@ -1096,7 +1704,7 @@ public class WindowTranslationTest {
 		}
 	}
 
-	private static class TestWindowFunction 
+	private static class TestWindowFunction
 			implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
 
 		@Override
@@ -1111,6 +1719,22 @@ public class WindowTranslationTest {
 		}
 	}
 
+	private static class TestProcessWindowFunction
+			extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
+
+		@Override
+		public void process(String key,
+				Context ctx,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, String, Integer>> out) throws Exception {
+
+			for (Tuple2<String, Integer> in : values) {
+				out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+			}
+		}
+	}
+
+
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 4a20371..23293a6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.api.scala.function.util
 
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
-import org.apache.flink.api.java.operators.translation.WrappingFunction
 import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -37,8 +35,7 @@ import scala.collection.JavaConverters._
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
-    extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
-    with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+    extends JProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -50,15 +47,4 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     }
     func.process(key, ctx, elements.asScala, out)
   }
-
-  override def getRuntimeContext: RuntimeContext = {
-    throw new RuntimeException("This should never be called")
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    throw new RuntimeException("This should never be called")
-  }
 }
-
-private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
-  extends JProcessWindowFunction[IN, OUT, KEY, W]


[3/8] flink git commit: [FLINK-4997] Add doc for ProcessWindowFunction

Posted by al...@apache.org.
[FLINK-4997] Add doc for ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f047e13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f047e13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f047e13

Branch: refs/heads/master
Commit: 4f047e13518ad2eb493903179e38eb174a37994c
Parents: 86dff0e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 9 11:56:46 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 docs/dev/windows.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f047e13/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index f8be08f..73f348e 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -565,6 +565,108 @@ The example shows a `WindowFunction` to count the elements in a window. In addit
 
 <span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`.
 
+### ProcessWindowFunction
+
+In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This
+is very similar to `WindowFunction`, except that the interface allows to query more information
+about the context in which the window evaluation happens.
+
+This is the `ProcessWindowFunction` interface:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+    /**
+     * Evaluates the window and outputs none or several elements.
+     *
+     * @param key The key for which this window is evaluated.
+     * @param context The context in which the window is being evaluated.
+     * @param elements The elements in the window being evaluated.
+     * @param out A collector for emitting elements.
+     *
+     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+     */
+    public abstract void process(
+            KEY key,
+            Context context,
+            Iterable<IN> elements,
+            Collector<OUT> out) throws Exception;
+
+    /**
+     * The context holding window metadata
+     */
+    public abstract class Context {
+        /**
+         * @return The window that is being evaluated.
+         */
+        public abstract W window();
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key      The key for which this window is evaluated.
+    * @param context  The context in which the window is being evaluated.
+    * @param elements The elements in the window being evaluated.
+    * @param out      A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def process(
+      key: KEY,
+      context: Context,
+      elements: Iterable[IN],
+      out: Collector[OUT])
+
+  /**
+    * The context holding window metadata
+    */
+  abstract class Context {
+    /**
+      * @return The window that is being evaluated.
+      */
+    def window: W
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It can be used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .process(new MyProcessWindowFunction());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .process(new MyProcessWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
 ### WindowFunction with Incremental Aggregation
 
 A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
@@ -573,6 +675,9 @@ When the window is closed, the `WindowFunction` will be provided with the aggreg
 This allows to incrementally compute windows while having access to the
 additional window meta information of the `WindowFunction`.
 
+<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of
+`WindowFunction` for incremental window aggregation.
+
 #### Incremental Window Aggregation with FoldFunction
 
 The following example shows how an incremental `FoldFunction` can be combined with


[7/8] flink git commit: [FLINK-4997] Add ProcessWindowFunction support for .aggregate()

Posted by al...@apache.org.
[FLINK-4997] Add ProcessWindowFunction support for .aggregate()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe2a3016
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe2a3016
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe2a3016

Branch: refs/heads/master
Commit: fe2a3016f98e45d0c94a3fa1ed8c17b89a516859
Parents: 4f047e1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Feb 7 14:38:25 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 128 +++++++++++++++++++
 .../InternalAggregateProcessWindowFunction.java |  84 ++++++++++++
 .../functions/InternalWindowFunctionTest.java   | 104 +++++++++++++++
 .../streaming/api/scala/WindowedStream.scala    |  30 +++++
 4 files changed, 346 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 45eaae5..6809df0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -64,6 +64,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
@@ -906,6 +907,133 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggFunction The aggregate function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 *
+	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @param <ACC> The type of the AggregateFunction's accumulator
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+	 * @param <R> The type of the elements in the resulting stream, equal to the
+	 *            WindowFunction's result type
+	 */
+	@PublicEvolving
+	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, V> aggFunction,
+			ProcessWindowFunction<V, R, K, W> windowFunction) {
+
+		checkNotNull(aggFunction, "aggFunction");
+		checkNotNull(windowFunction, "windowFunction");
+
+		TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
+
+		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+	 * @param resultType Type information for the result type of the window function
+	 *
+	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @param <ACC> The type of the AggregateFunction's accumulator
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+	 * @param <R> The type of the elements in the resulting stream, equal to the
+	 *            WindowFunction's result type
+	 */
+	@PublicEvolving
+	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, V> aggregateFunction,
+			ProcessWindowFunction<V, R, K, W> windowFunction,
+			TypeInformation<ACC> accumulatorType,
+			TypeInformation<V> aggregateResultType,
+			TypeInformation<R> resultType) {
+
+		checkNotNull(aggregateFunction, "aggregateFunction");
+		checkNotNull(windowFunction, "windowFunction");
+		checkNotNull(accumulatorType, "accumulatorType");
+		checkNotNull(aggregateResultType, "aggregateResultType");
+		checkNotNull(resultType, "resultType");
+
+		if (aggregateFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+		}
+
+		//clean the closures
+		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+		aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalAggregateProcessWindowFunction<>(aggregateFunction, windowFunction),
+					trigger,
+					evictor,
+					allowedLateness);
+
+		} else {
+			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
+					aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueProcessWindowFunction<>(windowFunction),
+					trigger,
+					allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Window Function (apply)
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
new file mode 100644
index 0000000..433da9b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an
+ * {@code Iterable} and an {@link AggregateFunction}.
+ *
+ * @param <K> The key type
+ * @param <W> The window type
+ * @param <T> The type of the input to the AggregateFunction
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of the AggregateFunction's result, and the input to the WindowFunction
+ * @param <R> The result type of the WindowFunction
+ */
+public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W extends Window>
+		extends WrappingFunction<ProcessWindowFunction<V, R, K, W>>
+		implements InternalWindowFunction<Iterable<T>, R, K, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final AggregateFunction<T, ACC, V> aggFunction;
+
+	public InternalAggregateProcessWindowFunction(
+			AggregateFunction<T, ACC, V> aggFunction,
+			ProcessWindowFunction<V, R, K, W> windowFunction) {
+		super(windowFunction);
+		this.aggFunction = aggFunction;
+	}
+	
+	@Override
+	public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+		ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		final ACC acc = aggFunction.createAccumulator();
+
+		for (T val : input) {
+			aggFunction.add(val, acc);
+		}
+
+		wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 3c73035..e49a496 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.windowing.functions;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
@@ -39,6 +41,17 @@ import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.hamcrest.core.AllOf.allOf;
 import static org.mockito.Mockito.*;
 
 public class InternalWindowFunctionTest {
@@ -288,6 +301,84 @@ public class InternalWindowFunctionTest {
 		verify(mock).close();
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalAggregateProcessWindowFunction() throws Exception {
+
+		AggregateProcessWindowFunctionMock mock = mock(AggregateProcessWindowFunctionMock.class);
+
+		InternalAggregateProcessWindowFunction<Long, Set<Long>, Map<Long, Long>, String, Long, TimeWindow> windowFunction =
+				new InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public Set<Long> createAccumulator() {
+						return new HashSet<>();
+					}
+
+					@Override
+					public void add(Long value, Set<Long> accumulator) {
+						accumulator.add(value);
+					}
+
+					@Override
+					public Map<Long, Long> getResult(Set<Long> accumulator) {
+						Map<Long, Long> result = new HashMap<>();
+						for (Long in : accumulator) {
+							result.put(in, in);
+						}
+						return result;
+					}
+
+					@Override
+					public Set<Long> merge(Set<Long> a, Set<Long> b) {
+						a.addAll(b);
+						return a;
+					}
+				}, mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		List<Long> args = new LinkedList<>();
+		args.add(23L);
+		args.add(24L);
+		
+		windowFunction.apply(42L, w, args, c);
+		verify(mock).process(
+				eq(42L),
+				(AggregateProcessWindowFunctionMock.Context) anyObject(),
+				(Iterable) argThat(containsInAnyOrder(allOf(
+						hasEntry(is(23L), is(23L)),
+						hasEntry(is(24L), is(24L))))),
+				eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
 	public static class ProcessWindowFunctionMock
 		extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
@@ -301,6 +392,19 @@ public class InternalWindowFunctionTest {
 		public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
 	}
 
+	public static class AggregateProcessWindowFunctionMock
+			extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
+			implements OutputTypeConfigurable<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+	}
+
 	public static class WindowFunctionMock
 		extends RichWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 96ff334..a5fbeb9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -326,6 +326,36 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       accumulatorType, aggregationResultType, resultType))
   }
 
+  /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given aggregation function.
+    *
+    * @param preAggregator The aggregation function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
+  (preAggregator: AggregateFunction[T, ACC, V],
+   windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[V, R, K, W](cleanedWindowFunction)
+
+    val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.aggregate(
+      cleanedPreAggregator, applyFunction,
+      accumulatorType, aggregationResultType, resultType))
+  }
+
+
   // ---------------------------- fold() ------------------------------------
 
   /**


[8/8] flink git commit: [FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API

Posted by al...@apache.org.
[FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86dff0e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86dff0e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86dff0e6

Branch: refs/heads/master
Commit: 86dff0e6d584027994dd1320845169cc8b1a83d5
Parents: 1dcb2dc
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 9 10:49:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/scala/WindowedStream.scala    | 144 ++++++++++++++++++-
 .../scala/function/ProcessWindowFunction.scala  |  61 ++++++++
 .../function/RichProcessWindowFunction.scala    |  87 +++++++++++
 .../ScalaProcessWindowFunctionWrapper.scala     |  64 +++++++++
 .../streaming/api/scala/WindowFoldITCase.scala  |  64 ++++++++-
 .../api/scala/WindowFunctionITCase.scala        |  54 ++++++-
 .../api/scala/WindowReduceITCase.scala          |  64 ++++++++-
 ...ckingIdentityRichProcessWindowFunction.scala |  81 +++++++++++
 8 files changed, 605 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index ab27820..96ff334 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -18,14 +18,16 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.scala.function.util._
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -99,7 +101,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   // ------------------------------------------------------------------------
 
   // --------------------------- reduce() -----------------------------------
-  
+
   /**
    * Applies a reduce function to the window. The window function is called for each evaluation
    * of the window for each key individually. The output of the reduce function is interpreted
@@ -198,10 +200,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     asScalaStream(javaStream.reduce(reducer, applyFunction, implicitly[TypeInformation[R]]))
   }
 
+
+  /**
+    * Applies the given reduce function to each window. The window reduced value is
+    * then passed as input of the window function. The output of the window function
+    * is interpreted as a regular non-windowed stream.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: (T, T) => T,
+      function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(function)
+
+    val reducer = new ScalaReduceFunction[T](cleanedPreAggregator)
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(reducer, applyFunction, resultType))
+  }
+
+  /**
+    * Applies the given reduce function to each window. The window reduced value is
+    * then passed as input of the window function. The output of the window function
+    * is interpreted as a regular non-windowed stream.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: ReduceFunction[T],
+      function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(function)
+
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, resultType))
+  }
+
   // -------------------------- aggregate() ---------------------------------
 
   /**
-   * Applies the given aggregation function to each window and key. The aggregation function 
+   * Applies the given aggregation function to each window and key. The aggregation function
    * is called for each element, aggregating values incrementally and keeping the state to
    * one accumulator per key and window.
    *
@@ -213,7 +263,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    
+
     asScalaStream(javaStream.aggregate(
       clean(aggregateFunction), accumulatorType, resultType))
   }
@@ -241,7 +291,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
     val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    
+
     asScalaStream(javaStream.aggregate(
       cleanedPreAggregator, applyFunction,
       accumulatorType, aggregationResultType, resultType))
@@ -277,7 +327,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   }
 
   // ---------------------------- fold() ------------------------------------
-  
+
   /**
    * Applies the given fold function to each window. The window function is called for each
    * evaluation of the window for each key individually. The output of the reduce function is
@@ -379,9 +429,89 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, resultType))
   }
 
+
+  /**
+    * Applies the given fold function to each window. The window folded value is
+    * then passed as input of the process window function.
+    * The output of the process window function is interpreted as a regular non-windowed stream.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def fold[R: TypeInformation, ACC: TypeInformation](
+      initialValue: ACC,
+      foldFunction: (ACC, T) => ACC,
+      function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val cleanedFoldFunction = clean(foldFunction)
+
+    val folder = new ScalaFoldFunction[T, ACC](cleanedFoldFunction)
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      folder,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies the given fold function to each window. The window folded value is
+    * then passed as input of the process window function.
+    * The output of the process window function is interpreted as a regular non-windowed stream.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def fold[R: TypeInformation, ACC: TypeInformation](
+      initialValue: ACC,
+      foldFunction: FoldFunction[T, ACC],
+      function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val cleanedFoldFunction = clean(foldFunction)
+
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      cleanedFoldFunction,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
   // ---------------------------- apply() -------------------------------------
 
   /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Not that this function requires that all data in the windows is buffered until the window
+    * is evaluated, as the function provides no means of pre-aggregation.
+    *
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def process[R: TypeInformation](
+      function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanFunction = clean(function)
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanFunction)
+    asScalaStream(javaStream.process(applyFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
    * Applies the given window function to each window. The window function is called for each
    * evaluation of the window for each key individually. The output of the window function is
    * interpreted as a regular non-windowed stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
new file mode 100644
index 0000000..79f3918
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base abstract class for functions that are evaluated over keyed (grouped)
+  * windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam KEY The type of the key.
+  * @tparam W The type of the window.
+  */
+@PublicEvolving
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key      The key for which this window is evaluated.
+    * @param context  The context in which the window is being evaluated.
+    * @param elements The elements in the window being evaluated.
+    * @param out      A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+  /**
+    * The context holding window metadata
+    */
+  abstract class Context {
+    /**
+      * @return The window that is being evaluated.
+      */
+    def window: W
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
new file mode 100644
index 0000000..320685a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base abstract class for functions that are evaluated over
+  * keyed (grouped) windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam KEY The type of the key.
+  * @tparam W The type of the window.
+  */
+@Public
+abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
+    extends ProcessWindowFunction[IN, OUT, KEY, W]
+    with RichFunction {
+
+  @Transient
+  private var runtimeContext: RuntimeContext = null
+
+  // --------------------------------------------------------------------------------------------
+  //  Runtime context access
+  // --------------------------------------------------------------------------------------------
+
+  override def setRuntimeContext(t: RuntimeContext) {
+    this.runtimeContext = t
+  }
+
+  override def getRuntimeContext: RuntimeContext = {
+    if (this.runtimeContext != null) {
+      this.runtimeContext
+    }
+    else {
+      throw new IllegalStateException("The runtime context has not been initialized.")
+    }
+  }
+
+  override def getIterationRuntimeContext: IterationRuntimeContext = {
+    if (this.runtimeContext == null) {
+      throw new IllegalStateException("The runtime context has not been initialized.")
+    }
+    else {
+      this.runtimeContext match {
+        case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
+        case _ =>
+          throw new IllegalStateException("This stub is not part of an iteration step function.")
+      }
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Default life cycle methods
+  // --------------------------------------------------------------------------------------------
+  
+  @throws[Exception]
+  override def open(parameters: Configuration) {
+  }
+
+  @throws[Exception]
+  override def close() {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
new file mode 100644
index 0000000..4a20371
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
+import org.apache.flink.api.java.operators.translation.WrappingFunction
+import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+  * A wrapper function that exposes a Scala ProcessWindowFunction
+  * as a ProcessWindowFunction function.
+  *
+  * The Scala and Java Window functions differ in their type of "Iterable":
+  *   - Scala WindowFunction: scala.Iterable
+  *   - Java WindowFunction: java.lang.Iterable
+  */
+final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
+    private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
+    extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
+    with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+
+  override def process(
+      key: KEY,
+      context: JProcessWindowFunction[IN, OUT, KEY, W]#Context,
+      elements: java.lang.Iterable[IN],
+      out: Collector[OUT]): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+    }
+    func.process(key, ctx, elements.asScala, out)
+  }
+
+  override def getRuntimeContext: RuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+
+  override def getIterationRuntimeContext: IterationRuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+}
+
+private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
+  extends JProcessWindowFunction[IN, OUT, KEY, W]

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index 83697ce..a23145c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,13 +26,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
+import org.junit.{Ignore, Test}
 import org.junit.Assert._
 
 import scala.collection.mutable
@@ -150,6 +150,66 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  @Ignore
+  def testFoldWithProcessWindowFunction(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessWindowFunction.reset()
+
+    val foldFunc = new FoldFunction[(String, Int), (Int, String)] {
+      override def fold(accumulator: (Int, String), value: (String, Int)): (Int, String) = {
+        (accumulator._1 + value._2, accumulator._2 + value._1)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .fold(
+        (0, "R:"),
+        foldFunc,
+        new CheckingIdentityRichProcessWindowFunction[(Int, String), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(Int, String)]() {
+        def invoke(value: (Int, String)) {
+          WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold Process Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(3,R:aaa)",
+      "(21,R:aaa)",
+      "(12,R:bbb)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
   def testFoldAllWindow(): Unit = {
     WindowFoldITCase.testResults = mutable.MutableList()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index c38f422..bfbe6ee 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 import scala.collection.mutable
 
@@ -87,6 +87,56 @@ class WindowFunctionITCase {
   }
 
   @Test
+  @Ignore
+  def testRichProcessWindowFunction(): Unit = {
+    WindowFunctionITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessWindowFunction.reset()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {}
+
+    }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .process(new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFunctionITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("RichProcessWindowFunction Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+      "(b,3)", "(b,4)", "(b,5)")
+
+    assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
   def testRichAllWindowFunction(): Unit = {
     WindowFunctionITCase.testResults = mutable.MutableList()
     CheckingIdentityRichAllWindowFunction.reset()

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 9666266..5418108 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,15 +26,14 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 import scala.collection.mutable
 
@@ -150,6 +149,65 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  @Ignore
+  def testReduceWithProcessWindowFunction(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessWindowFunction.reset()
+
+    val reduceFunc = new ReduceFunction[(String, Int)] {
+      override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+        (a._1 + b._1, a._2 + b._2)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .reduce(
+        reduceFunc,
+        new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Reduce Process Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(aaa,21)",
+      "(bbb,12)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
   def testReduceAllWindow(): Unit = {
     WindowReduceITCase.testResults = mutable.MutableList()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
new file mode 100644
index 0000000..d62f2d3
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
+  extends RichProcessWindowFunction[T, T, K, W] {
+
+  override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
+    for (value <- input) {
+      out.collect(value)
+    }
+  }
+
+  override def open(conf: Configuration): Unit = {
+    super.open(conf)
+    CheckingIdentityRichProcessWindowFunction.openCalled = true
+  }
+
+  override def close(): Unit = {
+    super.close()
+    CheckingIdentityRichProcessWindowFunction.closeCalled = true
+  }
+
+  override def setRuntimeContext(context: RuntimeContext): Unit = {
+    super.setRuntimeContext(context)
+    CheckingIdentityRichProcessWindowFunction.contextSet = true
+  }
+}
+
+object CheckingIdentityRichProcessWindowFunction {
+
+  @volatile
+  private[CheckingIdentityRichProcessWindowFunction] var closeCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessWindowFunction] var openCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessWindowFunction] var contextSet = false
+
+  def reset(): Unit = {
+    closeCalled = false
+    openCalled = false
+    contextSet = false
+  }
+
+  def checkRichMethodCalls(): Unit = {
+    if (!contextSet) {
+      throw new AssertionError("context not set")
+    }
+    if (!openCalled) {
+      throw new AssertionError("open() not called")
+    }
+    if (!closeCalled) {
+      throw new AssertionError("close() not called")
+    }
+  }
+}