You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:18 UTC

[8/8] flink git commit: [FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API

[FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86dff0e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86dff0e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86dff0e6

Branch: refs/heads/master
Commit: 86dff0e6d584027994dd1320845169cc8b1a83d5
Parents: 1dcb2dc
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 9 10:49:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/scala/WindowedStream.scala    | 144 ++++++++++++++++++-
 .../scala/function/ProcessWindowFunction.scala  |  61 ++++++++
 .../function/RichProcessWindowFunction.scala    |  87 +++++++++++
 .../ScalaProcessWindowFunctionWrapper.scala     |  64 +++++++++
 .../streaming/api/scala/WindowFoldITCase.scala  |  64 ++++++++-
 .../api/scala/WindowFunctionITCase.scala        |  54 ++++++-
 .../api/scala/WindowReduceITCase.scala          |  64 ++++++++-
 ...ckingIdentityRichProcessWindowFunction.scala |  81 +++++++++++
 8 files changed, 605 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index ab27820..96ff334 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -18,14 +18,16 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.scala.function.util._
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -99,7 +101,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   // ------------------------------------------------------------------------
 
   // --------------------------- reduce() -----------------------------------
-  
+
   /**
    * Applies a reduce function to the window. The window function is called for each evaluation
    * of the window for each key individually. The output of the reduce function is interpreted
@@ -198,10 +200,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     asScalaStream(javaStream.reduce(reducer, applyFunction, implicitly[TypeInformation[R]]))
   }
 
+
+  /**
+    * Applies the given reduce function to each window. The window reduced value is
+    * then passed as input of the window function. The output of the window function
+    * is interpreted as a regular non-windowed stream.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: (T, T) => T,
+      function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(function)
+
+    val reducer = new ScalaReduceFunction[T](cleanedPreAggregator)
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(reducer, applyFunction, resultType))
+  }
+
+  /**
+    * Applies the given reduce function to each window. The window reduced value is
+    * then passed as input of the window function. The output of the window function
+    * is interpreted as a regular non-windowed stream.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: ReduceFunction[T],
+      function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(function)
+
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, resultType))
+  }
+
   // -------------------------- aggregate() ---------------------------------
 
   /**
-   * Applies the given aggregation function to each window and key. The aggregation function 
+   * Applies the given aggregation function to each window and key. The aggregation function
    * is called for each element, aggregating values incrementally and keeping the state to
    * one accumulator per key and window.
    *
@@ -213,7 +263,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    
+
     asScalaStream(javaStream.aggregate(
       clean(aggregateFunction), accumulatorType, resultType))
   }
@@ -241,7 +291,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
     val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    
+
     asScalaStream(javaStream.aggregate(
       cleanedPreAggregator, applyFunction,
       accumulatorType, aggregationResultType, resultType))
@@ -277,7 +327,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   }
 
   // ---------------------------- fold() ------------------------------------
-  
+
   /**
    * Applies the given fold function to each window. The window function is called for each
    * evaluation of the window for each key individually. The output of the reduce function is
@@ -379,9 +429,89 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, resultType))
   }
 
+
+  /**
+    * Applies the given fold function to each window. The window folded value is
+    * then passed as input of the process window function.
+    * The output of the process window function is interpreted as a regular non-windowed stream.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def fold[R: TypeInformation, ACC: TypeInformation](
+      initialValue: ACC,
+      foldFunction: (ACC, T) => ACC,
+      function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val cleanedFoldFunction = clean(foldFunction)
+
+    val folder = new ScalaFoldFunction[T, ACC](cleanedFoldFunction)
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      folder,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies the given fold function to each window. The window folded value is
+    * then passed as input of the process window function.
+    * The output of the process window function is interpreted as a regular non-windowed stream.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental aggregation
+    * @param function The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def fold[R: TypeInformation, ACC: TypeInformation](
+      initialValue: ACC,
+      foldFunction: FoldFunction[T, ACC],
+      function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val cleanedFoldFunction = clean(foldFunction)
+
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      cleanedFoldFunction,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
   // ---------------------------- apply() -------------------------------------
 
   /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Not that this function requires that all data in the windows is buffered until the window
+    * is evaluated, as the function provides no means of pre-aggregation.
+    *
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def process[R: TypeInformation](
+      function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanFunction = clean(function)
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanFunction)
+    asScalaStream(javaStream.process(applyFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
    * Applies the given window function to each window. The window function is called for each
    * evaluation of the window for each key individually. The output of the window function is
    * interpreted as a regular non-windowed stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
new file mode 100644
index 0000000..79f3918
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base abstract class for functions that are evaluated over keyed (grouped)
+  * windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam KEY The type of the key.
+  * @tparam W The type of the window.
+  */
+@PublicEvolving
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key      The key for which this window is evaluated.
+    * @param context  The context in which the window is being evaluated.
+    * @param elements The elements in the window being evaluated.
+    * @param out      A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+  /**
+    * The context holding window metadata
+    */
+  abstract class Context {
+    /**
+      * @return The window that is being evaluated.
+      */
+    def window: W
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
new file mode 100644
index 0000000..320685a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base abstract class for functions that are evaluated over
+  * keyed (grouped) windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam KEY The type of the key.
+  * @tparam W The type of the window.
+  */
+@Public
+abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
+    extends ProcessWindowFunction[IN, OUT, KEY, W]
+    with RichFunction {
+
+  @Transient
+  private var runtimeContext: RuntimeContext = null
+
+  // --------------------------------------------------------------------------------------------
+  //  Runtime context access
+  // --------------------------------------------------------------------------------------------
+
+  override def setRuntimeContext(t: RuntimeContext) {
+    this.runtimeContext = t
+  }
+
+  override def getRuntimeContext: RuntimeContext = {
+    if (this.runtimeContext != null) {
+      this.runtimeContext
+    }
+    else {
+      throw new IllegalStateException("The runtime context has not been initialized.")
+    }
+  }
+
+  override def getIterationRuntimeContext: IterationRuntimeContext = {
+    if (this.runtimeContext == null) {
+      throw new IllegalStateException("The runtime context has not been initialized.")
+    }
+    else {
+      this.runtimeContext match {
+        case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
+        case _ =>
+          throw new IllegalStateException("This stub is not part of an iteration step function.")
+      }
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Default life cycle methods
+  // --------------------------------------------------------------------------------------------
+  
+  @throws[Exception]
+  override def open(parameters: Configuration) {
+  }
+
+  @throws[Exception]
+  override def close() {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
new file mode 100644
index 0000000..4a20371
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
+import org.apache.flink.api.java.operators.translation.WrappingFunction
+import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+  * A wrapper function that exposes a Scala ProcessWindowFunction
+  * as a ProcessWindowFunction function.
+  *
+  * The Scala and Java Window functions differ in their type of "Iterable":
+  *   - Scala WindowFunction: scala.Iterable
+  *   - Java WindowFunction: java.lang.Iterable
+  */
+final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
+    private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
+    extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
+    with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+
+  override def process(
+      key: KEY,
+      context: JProcessWindowFunction[IN, OUT, KEY, W]#Context,
+      elements: java.lang.Iterable[IN],
+      out: Collector[OUT]): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+    }
+    func.process(key, ctx, elements.asScala, out)
+  }
+
+  override def getRuntimeContext: RuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+
+  override def getIterationRuntimeContext: IterationRuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+}
+
+private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
+  extends JProcessWindowFunction[IN, OUT, KEY, W]

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index 83697ce..a23145c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,13 +26,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
+import org.junit.{Ignore, Test}
 import org.junit.Assert._
 
 import scala.collection.mutable
@@ -150,6 +150,66 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  @Ignore
+  def testFoldWithProcessWindowFunction(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessWindowFunction.reset()
+
+    val foldFunc = new FoldFunction[(String, Int), (Int, String)] {
+      override def fold(accumulator: (Int, String), value: (String, Int)): (Int, String) = {
+        (accumulator._1 + value._2, accumulator._2 + value._1)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .fold(
+        (0, "R:"),
+        foldFunc,
+        new CheckingIdentityRichProcessWindowFunction[(Int, String), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(Int, String)]() {
+        def invoke(value: (Int, String)) {
+          WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold Process Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(3,R:aaa)",
+      "(21,R:aaa)",
+      "(12,R:bbb)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
   def testFoldAllWindow(): Unit = {
     WindowFoldITCase.testResults = mutable.MutableList()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index c38f422..bfbe6ee 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 import scala.collection.mutable
 
@@ -87,6 +87,56 @@ class WindowFunctionITCase {
   }
 
   @Test
+  @Ignore
+  def testRichProcessWindowFunction(): Unit = {
+    WindowFunctionITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessWindowFunction.reset()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {}
+
+    }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .process(new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFunctionITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("RichProcessWindowFunction Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+      "(b,3)", "(b,4)", "(b,5)")
+
+    assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
   def testRichAllWindowFunction(): Unit = {
     WindowFunctionITCase.testResults = mutable.MutableList()
     CheckingIdentityRichAllWindowFunction.reset()

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 9666266..5418108 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,15 +26,14 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 import scala.collection.mutable
 
@@ -150,6 +149,65 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  @Ignore
+  def testReduceWithProcessWindowFunction(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessWindowFunction.reset()
+
+    val reduceFunc = new ReduceFunction[(String, Int)] {
+      override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+        (a._1 + b._1, a._2 + b._2)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .reduce(
+        reduceFunc,
+        new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Reduce Process Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(aaa,21)",
+      "(bbb,12)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
   def testReduceAllWindow(): Unit = {
     WindowReduceITCase.testResults = mutable.MutableList()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
new file mode 100644
index 0000000..d62f2d3
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
+  extends RichProcessWindowFunction[T, T, K, W] {
+
+  override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
+    for (value <- input) {
+      out.collect(value)
+    }
+  }
+
+  override def open(conf: Configuration): Unit = {
+    super.open(conf)
+    CheckingIdentityRichProcessWindowFunction.openCalled = true
+  }
+
+  override def close(): Unit = {
+    super.close()
+    CheckingIdentityRichProcessWindowFunction.closeCalled = true
+  }
+
+  override def setRuntimeContext(context: RuntimeContext): Unit = {
+    super.setRuntimeContext(context)
+    CheckingIdentityRichProcessWindowFunction.contextSet = true
+  }
+}
+
+object CheckingIdentityRichProcessWindowFunction {
+
+  @volatile
+  private[CheckingIdentityRichProcessWindowFunction] var closeCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessWindowFunction] var openCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessWindowFunction] var contextSet = false
+
+  def reset(): Unit = {
+    closeCalled = false
+    openCalled = false
+    contextSet = false
+  }
+
+  def checkRichMethodCalls(): Unit = {
+    if (!contextSet) {
+      throw new AssertionError("context not set")
+    }
+    if (!openCalled) {
+      throw new AssertionError("open() not called")
+    }
+    if (!closeCalled) {
+      throw new AssertionError("close() not called")
+    }
+  }
+}