You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:18 UTC
[8/8] flink git commit: [FLINK-4997] [streaming] Add
ProcessWindowFunction to Scala API
[FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86dff0e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86dff0e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86dff0e6
Branch: refs/heads/master
Commit: 86dff0e6d584027994dd1320845169cc8b1a83d5
Parents: 1dcb2dc
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 9 10:49:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../streaming/api/scala/WindowedStream.scala | 144 ++++++++++++++++++-
.../scala/function/ProcessWindowFunction.scala | 61 ++++++++
.../function/RichProcessWindowFunction.scala | 87 +++++++++++
.../ScalaProcessWindowFunctionWrapper.scala | 64 +++++++++
.../streaming/api/scala/WindowFoldITCase.scala | 64 ++++++++-
.../api/scala/WindowFunctionITCase.scala | 54 ++++++-
.../api/scala/WindowReduceITCase.scala | 64 ++++++++-
...ckingIdentityRichProcessWindowFunction.scala | 81 +++++++++++
8 files changed, 605 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index ab27820..96ff334 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -18,14 +18,16 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.scala.function.util._
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -99,7 +101,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
// ------------------------------------------------------------------------
// --------------------------- reduce() -----------------------------------
-
+
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
@@ -198,10 +200,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
asScalaStream(javaStream.reduce(reducer, applyFunction, implicitly[TypeInformation[R]]))
}
+
+ /**
+ * Applies the given reduce function to each window. The window reduced value is
+ * then passed as input of the window function. The output of the window function
+ * is interpreted as a regular non-windowed stream.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def reduce[R: TypeInformation](
+ preAggregator: (T, T) => T,
+ function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+ val cleanedPreAggregator = clean(preAggregator)
+ val cleanedWindowFunction = clean(function)
+
+ val reducer = new ScalaReduceFunction[T](cleanedPreAggregator)
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+ val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.reduce(reducer, applyFunction, resultType))
+ }
+
+ /**
+ * Applies the given reduce function to each window. The window reduced value is
+ * then passed as input of the window function. The output of the window function
+ * is interpreted as a regular non-windowed stream.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def reduce[R: TypeInformation](
+ preAggregator: ReduceFunction[T],
+ function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+ val cleanedPreAggregator = clean(preAggregator)
+ val cleanedWindowFunction = clean(function)
+
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+ val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, resultType))
+ }
+
// -------------------------- aggregate() ---------------------------------
/**
- * Applies the given aggregation function to each window and key. The aggregation function
+ * Applies the given aggregation function to each window and key. The aggregation function
* is called for each element, aggregating values incrementally and keeping the state to
* one accumulator per key and window.
*
@@ -213,7 +263,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-
+
asScalaStream(javaStream.aggregate(
clean(aggregateFunction), accumulatorType, resultType))
}
@@ -241,7 +291,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-
+
asScalaStream(javaStream.aggregate(
cleanedPreAggregator, applyFunction,
accumulatorType, aggregationResultType, resultType))
@@ -277,7 +327,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
}
// ---------------------------- fold() ------------------------------------
-
+
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
@@ -379,9 +429,89 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, resultType))
}
+
+ /**
+ * Applies the given fold function to each window. The window folded value is
+ * then passed as input of the process window function.
+ * The output of the process window function is interpreted as a regular non-windowed stream.
+ *
+ * @param initialValue The initial value of the fold
+ * @param foldFunction The fold function that is used for incremental aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def fold[R: TypeInformation, ACC: TypeInformation](
+ initialValue: ACC,
+ foldFunction: (ACC, T) => ACC,
+ function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+ val cleanedFunction = clean(function)
+ val cleanedFoldFunction = clean(foldFunction)
+
+ val folder = new ScalaFoldFunction[T, ACC](cleanedFoldFunction)
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+ asScalaStream(javaStream.fold(
+ initialValue,
+ folder,
+ applyFunction,
+ implicitly[TypeInformation[ACC]],
+ implicitly[TypeInformation[R]]))
+ }
+
+ /**
+ * Applies the given fold function to each window. The window folded value is
+ * then passed as input of the process window function.
+ * The output of the process window function is interpreted as a regular non-windowed stream.
+ *
+ * @param initialValue The initial value of the fold
+ * @param foldFunction The fold function that is used for incremental aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def fold[R: TypeInformation, ACC: TypeInformation](
+ initialValue: ACC,
+ foldFunction: FoldFunction[T, ACC],
+ function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+ val cleanedFunction = clean(function)
+ val cleanedFoldFunction = clean(foldFunction)
+
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+ asScalaStream(javaStream.fold(
+ initialValue,
+ cleanedFoldFunction,
+ applyFunction,
+ implicitly[TypeInformation[ACC]],
+ implicitly[TypeInformation[R]]))
+ }
+
// ---------------------------- apply() -------------------------------------
/**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def process[R: TypeInformation](
+ function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+ val cleanFunction = clean(function)
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanFunction)
+ asScalaStream(javaStream.process(applyFunction, implicitly[TypeInformation[R]]))
+ }
+
+ /**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
new file mode 100644
index 0000000..79f3918
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped)
+ * windows using a context for retrieving extra information.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam KEY The type of the key.
+ * @tparam W The type of the window.
+ */
+@PublicEvolving
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+ /**
+ * The context holding window metadata
+ */
+ abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ def window: W
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
new file mode 100644
index 0000000..320685a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+ * Base abstract class for functions that are evaluated over
+ * keyed (grouped) windows using a context for retrieving extra information.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam KEY The type of the key.
+ * @tparam W The type of the window.
+ */
+@Public
+abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
+ extends ProcessWindowFunction[IN, OUT, KEY, W]
+ with RichFunction {
+
+ @Transient
+ private var runtimeContext: RuntimeContext = null
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime context access
+ // --------------------------------------------------------------------------------------------
+
+ override def setRuntimeContext(t: RuntimeContext) {
+ this.runtimeContext = t
+ }
+
+ override def getRuntimeContext: RuntimeContext = {
+ if (this.runtimeContext != null) {
+ this.runtimeContext
+ }
+ else {
+ throw new IllegalStateException("The runtime context has not been initialized.")
+ }
+ }
+
+ override def getIterationRuntimeContext: IterationRuntimeContext = {
+ if (this.runtimeContext == null) {
+ throw new IllegalStateException("The runtime context has not been initialized.")
+ }
+ else {
+ this.runtimeContext match {
+ case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
+ case _ =>
+ throw new IllegalStateException("This stub is not part of an iteration step function.")
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ // --------------------------------------------------------------------------------------------
+
+ @throws[Exception]
+ override def open(parameters: Configuration) {
+ }
+
+ @throws[Exception]
+ override def close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
new file mode 100644
index 0000000..4a20371
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
+import org.apache.flink.api.java.operators.translation.WrappingFunction
+import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A wrapper function that exposes a Scala ProcessWindowFunction
+ * as a ProcessWindowFunction function.
+ *
+ * The Scala and Java Window functions differ in their type of "Iterable":
+ * - Scala WindowFunction: scala.Iterable
+ * - Java WindowFunction: java.lang.Iterable
+ */
+final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
+ private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
+ extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
+ with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+
+ override def process(
+ key: KEY,
+ context: JProcessWindowFunction[IN, OUT, KEY, W]#Context,
+ elements: java.lang.Iterable[IN],
+ out: Collector[OUT]): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+ }
+ func.process(key, ctx, elements.asScala, out)
+ }
+
+ override def getRuntimeContext: RuntimeContext = {
+ throw new RuntimeException("This should never be called")
+ }
+
+ override def getIterationRuntimeContext: IterationRuntimeContext = {
+ throw new RuntimeException("This should never be called")
+ }
+}
+
+private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
+ extends JProcessWindowFunction[IN, OUT, KEY, W]
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index 83697ce..a23145c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,13 +26,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
+import org.junit.{Ignore, Test}
import org.junit.Assert._
import scala.collection.mutable
@@ -150,6 +150,66 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}
@Test
+ @Ignore
+ def testFoldWithProcessWindowFunction(): Unit = {
+ WindowFoldITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessWindowFunction.reset()
+
+ val foldFunc = new FoldFunction[(String, Int), (Int, String)] {
+ override def fold(accumulator: (Int, String), value: (String, Int)): (Int, String) = {
+ (accumulator._1 + value._2, accumulator._2 + value._1)
+ }
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {
+ }
+ }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .fold(
+ (0, "R:"),
+ foldFunc,
+ new CheckingIdentityRichProcessWindowFunction[(Int, String), Tuple, TimeWindow]())
+ .addSink(new SinkFunction[(Int, String)]() {
+ def invoke(value: (Int, String)) {
+ WindowFoldITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("Fold Process Window Test")
+
+ val expectedResult = mutable.MutableList(
+ "(3,R:aaa)",
+ "(21,R:aaa)",
+ "(12,R:bbb)")
+
+ assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+ }
+
+ @Test
def testFoldAllWindow(): Unit = {
WindowFoldITCase.testResults = mutable.MutableList()
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index c38f422..bfbe6ee 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
import scala.collection.mutable
@@ -87,6 +87,56 @@ class WindowFunctionITCase {
}
@Test
+ @Ignore
+ def testRichProcessWindowFunction(): Unit = {
+ WindowFunctionITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessWindowFunction.reset()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {}
+
+ }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .process(new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowFunctionITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("RichProcessWindowFunction Test")
+
+ val expectedResult = mutable.MutableList(
+ "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+ "(b,3)", "(b,4)", "(b,5)")
+
+ assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+ }
+
+ @Test
def testRichAllWindowFunction(): Unit = {
WindowFunctionITCase.testResults = mutable.MutableList()
CheckingIdentityRichAllWindowFunction.reset()
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 9666266..5418108 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,15 +26,14 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
import scala.collection.mutable
@@ -150,6 +149,65 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
}
@Test
+ @Ignore
+ def testReduceWithProcessWindowFunction(): Unit = {
+ WindowReduceITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessWindowFunction.reset()
+
+ val reduceFunc = new ReduceFunction[(String, Int)] {
+ override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+ (a._1 + b._1, a._2 + b._2)
+ }
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {
+ }
+ }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .reduce(
+ reduceFunc,
+ new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowReduceITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("Reduce Process Window Test")
+
+ val expectedResult = mutable.MutableList(
+ "(aaa,3)",
+ "(aaa,21)",
+ "(bbb,12)")
+
+ assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+ }
+
+ @Test
def testReduceAllWindow(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
new file mode 100644
index 0000000..d62f2d3
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
+ extends RichProcessWindowFunction[T, T, K, W] {
+
+ override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
+ for (value <- input) {
+ out.collect(value)
+ }
+ }
+
+ override def open(conf: Configuration): Unit = {
+ super.open(conf)
+ CheckingIdentityRichProcessWindowFunction.openCalled = true
+ }
+
+ override def close(): Unit = {
+ super.close()
+ CheckingIdentityRichProcessWindowFunction.closeCalled = true
+ }
+
+ override def setRuntimeContext(context: RuntimeContext): Unit = {
+ super.setRuntimeContext(context)
+ CheckingIdentityRichProcessWindowFunction.contextSet = true
+ }
+}
+
+object CheckingIdentityRichProcessWindowFunction {
+
+ @volatile
+ private[CheckingIdentityRichProcessWindowFunction] var closeCalled = false
+
+ @volatile
+ private[CheckingIdentityRichProcessWindowFunction] var openCalled = false
+
+ @volatile
+ private[CheckingIdentityRichProcessWindowFunction] var contextSet = false
+
+ def reset(): Unit = {
+ closeCalled = false
+ openCalled = false
+ contextSet = false
+ }
+
+ def checkRichMethodCalls(): Unit = {
+ if (!contextSet) {
+ throw new AssertionError("context not set")
+ }
+ if (!openCalled) {
+ throw new AssertionError("open() not called")
+ }
+ if (!closeCalled) {
+ throw new AssertionError("close() not called")
+ }
+ }
+}