You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:11 UTC
[1/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window
Translation Tests
Repository: flink
Updated Branches:
refs/heads/master 09fe4b0b8 -> 5368a7d32
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index bd3fe3d..600645b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.state.{AggregatingStateDescriptor, FoldingSta
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.operators.{OneInputStreamOperator, OutputTypeConfigurable}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -367,6 +367,90 @@ class WindowTranslationTest {
}
@Test
+ def testReduceWithProcessWindowFunctionEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ new DummyReducer,
+ new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testReduceWithProcessWindowFunctionProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ new DummyReducer,
+ new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testApplyWithPreReducerEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -408,6 +492,50 @@ class WindowTranslationTest {
}
@Test
+ def testApplyWithPreReducerAndEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .evictor(CountEvictor.of(100))
+ .apply(
+ new DummyReducer,
+ new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def apply(
+ key: String,
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+
+ @Test
def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -584,6 +712,74 @@ class WindowTranslationTest {
}
@Test
+ def testAggregateWithProcessWindowFunctionEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .aggregate(new DummyAggregator(), new TestProcessWindowFunction())
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testAggregateWithProcessWindowFunctionProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+ .aggregate(new DummyAggregator(), new TestProcessWindowFunction())
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -814,7 +1010,7 @@ class WindowTranslationTest {
}
@Test
- def testApplyWithPreFolderEventTime() {
+ def testFoldWithProcessWindowFunctionEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -823,16 +1019,15 @@ class WindowTranslationTest {
val window1 = source
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
- .apply(
+ .fold(
("", "", 1),
new DummyFolder,
- new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] {
- override def apply(
+ new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+ override def process(
key: String,
- window: TimeWindow,
+ window: Context,
input: Iterable[(String, String, Int)],
- out: Collector[(String, String, Int)]): Unit =
- input foreach {x => out.collect((x._1, x._2, x._3))}
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
})
val transform = window1
@@ -858,20 +1053,24 @@ class WindowTranslationTest {
}
@Test
- def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
+ def testFoldWithProcessWindowFunctionProcessingTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
.keyBy(_._1)
- .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.fold(
("", "", 1),
- { (acc: (String, String, Int), _) => acc },
- { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
- in foreach { x => out.collect((x._1, x._3)) }
+ new DummyFolder,
+ new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
})
val transform = window1
@@ -885,8 +1084,8 @@ class WindowTranslationTest {
val winOperator = operator
.asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
- assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
- assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
processElementAndEnsureOutput[String, (String, Int), (String, Int)](
@@ -896,12 +1095,8 @@ class WindowTranslationTest {
("hello", 1))
}
- // --------------------------------------------------------------------------
- // apply() tests
- // --------------------------------------------------------------------------
-
@Test
- def testApplyEventTime() {
+ def testApplyWithPreFolderEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -911,12 +1106,15 @@ class WindowTranslationTest {
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(
- new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ ("", "", 1),
+ new DummyFolder,
+ new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
- input: Iterable[(String, Int)],
- out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, String, Int)]): Unit =
+ input foreach {x => out.collect((x._1, x._2, x._3))}
})
val transform = window1
@@ -932,7 +1130,7 @@ class WindowTranslationTest {
assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
- assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
processElementAndEnsureOutput[String, (String, Int), (String, Int)](
winOperator,
@@ -942,22 +1140,26 @@ class WindowTranslationTest {
}
@Test
- def testApplyProcessingTimeTime() {
+ def testApplyWithPreFolderAndEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
.keyBy(_._1)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
.apply(
- new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ ("", "", 1),
+ new DummyFolder,
+ new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow] {
override def apply(
- key: String,
- window: TimeWindow,
- input: Iterable[(String, Int)],
- out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ key: String,
+ window: TimeWindow,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, String, Int)]): Unit =
+ input foreach {x => out.collect((x._1, x._2, x._3))}
})
val transform = window1
@@ -971,8 +1173,8 @@ class WindowTranslationTest {
val winOperator = operator
.asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
- assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
- assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
processElementAndEnsureOutput[String, (String, Int), (String, Int)](
@@ -983,7 +1185,7 @@ class WindowTranslationTest {
}
@Test
- def testApplyEventTimeWithScalaFunction() {
+ def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -992,9 +1194,12 @@ class WindowTranslationTest {
val window1 = source
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
- .apply { (key, window, in, out: Collector[(String, Int)]) =>
- in foreach { x => out.collect(x)}
- }
+ .fold(
+ ("", "", 1),
+ { (acc: (String, String, Int), _) => acc },
+ { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+ in foreach { x => out.collect((x._1, x._3)) }
+ })
val transform = window1
.javaStream
@@ -1009,7 +1214,211 @@ class WindowTranslationTest {
assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
- assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ // --------------------------------------------------------------------------
+ // apply() tests
+ // --------------------------------------------------------------------------
+
+ @Test
+ def testApplyEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply(
+ new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def apply(
+ key: String,
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply(
+ new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def apply(
+ key: String,
+ window: TimeWindow,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testProcessEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .process(
+ new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testProcessProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .process(
+ new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testApplyEventTimeWithScalaFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .apply { (key, window, in, out: Collector[(String, Int)]) =>
+ in foreach { x => out.collect(x)}
+ }
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
processElementAndEnsureOutput[String, (String, Int), (String, Int)](
winOperator,
@@ -1132,6 +1541,49 @@ class WindowTranslationTest {
}
@Test
+ def testProcessWithCustomTrigger() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .trigger(CountTrigger.of(1))
+ .process(
+ new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+
+ @Test
def testReduceWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1169,6 +1621,113 @@ class WindowTranslationTest {
}
@Test
+ def testReduceWithEvictorAndProcessFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .reduce(new DummyReducer, new TestProcessWindowFunction)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[
+ EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testAggregateWithEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(new DummyAggregator())
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testAggregateWithEvictorAndProcessFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(new DummyAggregator(), new TestProcessWindowFunction)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testFoldWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1210,6 +1769,48 @@ class WindowTranslationTest {
}
@Test
+ def testFoldWithEvictorAndProcessFunction() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .fold(("", "", 1), new DummyFolder, new TestFoldProcessWindowFunction)
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[
+ EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ winOperator.setOutputType(
+ window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]],
+ new ExecutionConfig)
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+
+ @Test
def testApplyWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1252,6 +1853,48 @@ class WindowTranslationTest {
("hello", 1))
}
+ @Test
+ def testProcessWithEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .keyBy(_._1)
+ .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .process(
+ new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
/**
* Ensure that we get some output from the given operator when pushing in an element and
@@ -1266,6 +1909,12 @@ class WindowTranslationTest {
val testHarness =
new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
+ if (operator.isInstanceOf[OutputTypeConfigurable[String]]) {
+ // use a dummy type since window functions just need the ExecutionConfig
+ // this is also only needed for Fold, which we're getting rid off soon.
+ operator.asInstanceOf[OutputTypeConfigurable[String]]
+ .setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig)
+ }
testHarness.open()
testHarness.setProcessingTime(0)
@@ -1319,16 +1968,45 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String,
override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
}
-class TestWindowFunction extends WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+class TestWindowFunction
+ extends WindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
input: Iterable[(String, Int)],
- out: Collector[(String, Int)]): Unit = {
+ out: Collector[(String, String, Int)]): Unit = {
+
+ input.foreach(e => out.collect((e._1, e._1, e._2)))
+ }
+}
+
+class TestProcessWindowFunction
+ extends ProcessWindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
+
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, String, Int)]): Unit = {
- input.foreach(out.collect)
+ input.foreach(e => out.collect((e._1, e._1, e._2)))
}
}
+class TestFoldProcessWindowFunction
+ extends ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+
+ override def process(
+ key: String,
+ window: Context,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]): Unit = {
+
+ input.foreach(e => out.collect((e._1, e._3)))
+ }
+}
+
+
+
[4/8] flink git commit: [hotfix] Fix trailing whitespace in
WindowedStream.java
Posted by al...@apache.org.
[hotfix] Fix trailing whitespace in WindowedStream.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82db667d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82db667d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82db667d
Branch: refs/heads/master
Commit: 82db667d319778f30244e9c4212bf3a2920f604f
Parents: 09fe4b0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Feb 7 10:54:54 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedStream.java | 34 ++++++++++----------
1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82db667d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 3fbdda8..04da04d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -89,7 +89,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Note that the {@code WindowedStream} is purely and API construct, during runtime
* the {@code WindowedStream} will be collapsed together with the
* {@code KeyedStream} and the operation over the window into one single operation.
- *
+ *
* @param <T> The type of elements in the stream.
* @param <K> The type of the key by which elements are grouped.
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
@@ -190,9 +190,9 @@ public class WindowedStream<T, K, W extends Window> {
* so a few elements are stored per key (one per slide interval).
* Custom windows may not be able to incrementally aggregate, or may need to store extra values
* in an aggregation tree.
- *
+ *
* @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
+ * @return The data stream that is the result of applying the reduce function to the window.
*/
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
@@ -521,10 +521,10 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param function The aggregation function.
* @return The data stream that is the result of applying the fold function to the window.
- *
+ *
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <R> The type of the elements in the resulting stream, equal to the
- * AggregateFunction's result type
+ * AggregateFunction's result type
*/
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
@@ -549,10 +549,10 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param function The aggregation function.
* @return The data stream that is the result of applying the aggregation function to the window.
- *
+ *
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <R> The type of the elements in the resulting stream, equal to the
- * AggregateFunction's result type
+ * AggregateFunction's result type
*/
public <ACC, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, R> function,
@@ -581,11 +581,11 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param aggFunction The aggregate function that is used for incremental aggregation.
* @param windowFunction The window function.
- *
+ *
* @return The data stream that is the result of applying the window function to the window.
- *
+ *
* @param <ACC> The type of the AggregateFunction's accumulator
- * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+ * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
@@ -620,17 +620,17 @@ public class WindowedStream<T, K, W extends Window> {
* @param windowFunction The window function.
* @param accumulatorType Type information for the internal accumulator type of the aggregation function
* @param resultType Type information for the result type of the window function
- *
+ *
* @return The data stream that is the result of applying the window function to the window.
- *
+ *
* @param <ACC> The type of the AggregateFunction's accumulator
- * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+ * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggregateFunction,
- WindowFunction<V, R, K, W> windowFunction,
+ WindowFunction<V, R, K, W> windowFunction,
TypeInformation<ACC> accumulatorType,
TypeInformation<V> aggregateResultType,
TypeInformation<R> resultType) {
@@ -699,7 +699,7 @@ public class WindowedStream<T, K, W extends Window> {
// ------------------------------------------------------------------------
// Window Function (apply)
// ------------------------------------------------------------------------
-
+
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
@@ -708,7 +708,7 @@ public class WindowedStream<T, K, W extends Window> {
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
- *
+ *
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
@@ -1229,7 +1229,7 @@ public class WindowedStream<T, K, W extends Window> {
@SuppressWarnings("unchecked")
OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(),
+ reducer, input.getKeySelector(),
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
windowLength, windowSlide);
[6/8] flink git commit: [FLINK-4997] [streaming] Introduce
ProcessWindowFunction
Posted by al...@apache.org.
[FLINK-4997] [streaming] Introduce ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dcb2dcd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dcb2dcd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dcb2dcd
Branch: refs/heads/master
Commit: 1dcb2dcd8969941988a4fc7e5488e9272dfd507e
Parents: 82db667
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 23 18:00:23 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedStream.java | 381 ++++++++++++++---
.../FoldApplyProcessWindowFunction.java | 120 ++++++
.../windowing/ProcessWindowFunction.java | 61 +++
.../ReduceApplyProcessWindowFunction.java | 80 ++++
.../windowing/RichProcessWindowFunction.java | 85 ++++
.../windowing/AccumulatingKeyedTimePanes.java | 12 +-
...ccumulatingProcessingTimeWindowOperator.java | 16 +-
.../InternalIterableProcessWindowFunction.java | 63 +++
...nternalSingleValueProcessWindowFunction.java | 66 +++
.../FoldApplyProcessWindowFunctionTest.java | 155 +++++++
.../operators/FoldApplyWindowFunctionTest.java | 28 +-
.../functions/InternalWindowFunctionTest.java | 101 ++++-
...AlignedProcessingTimeWindowOperatorTest.java | 419 ++++++++++++++++++-
.../operators/windowing/WindowOperatorTest.java | 177 ++++++++
.../streaming/runtime/WindowFoldITCase.java | 78 ++++
15 files changed, 1738 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 04da04d..45eaae5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.AggregateFunction;
@@ -39,8 +40,11 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -60,9 +64,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -360,6 +367,98 @@ public class WindowedStream<T, K, W extends Window> {
return input.transform(opName, resultType, operator);
}
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given reducer.
+ *
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, ProcessWindowFunction.class, true, true, input.getType(), null, false);
+
+ return reduce(reduceFunction, function, resultType);
+ }
+
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given reducer.
+ *
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
+ * @param function The window function.
+ * @param resultType Type information for the result type of the window function
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+ if (reduceFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
+ }
+ //clean the closures
+ function = input.getExecutionEnvironment().clean(function);
+ reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "WindowedStream." + callLocation;
+
+ String opName;
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+ reduceFunction,
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator =
+ new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueProcessWindowFunction<>(function),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, resultType, operator);
+ }
+
// ------------------------------------------------------------------------
// Fold Function
// ------------------------------------------------------------------------
@@ -510,6 +609,117 @@ public class WindowedStream<T, K, W extends Window> {
return input.transform(opName, resultType, operator);
}
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given fold function.
+ *
+ * @param initialValue The initial value of the fold.
+ * @param foldFunction The fold function that is used for incremental aggregation.
+ * @param windowFunction The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
+ if (foldFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
+ }
+
+ TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+ Utils.getCallLocationName(), true);
+
+ TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType(
+ windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false);
+
+ return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given fold function.
+ *
+ * @param initialValue the initial value to be passed to the first invocation of the fold function
+ * @param foldFunction The fold function.
+ * @param foldResultType The result type of the fold function.
+ * @param windowFunction The process window function.
+ * @param windowResultType The process window function result type.
+ * @return The data stream that is the result of applying the fold function to the window.
+ */
+ @Internal
+ public <R, ACC> SingleOutputStreamOperator<R> fold(
+ ACC initialValue,
+ FoldFunction<T, ACC> foldFunction,
+ ProcessWindowFunction<ACC, R, K, W> windowFunction,
+ TypeInformation<ACC> foldResultType,
+ TypeInformation<R> windowResultType) {
+ if (foldFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
+ }
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
+ }
+
+ //clean the closures
+ windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+ foldFunction = input.getExecutionEnvironment().clean(foldFunction);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "WindowedStream." + callLocation;
+
+ String opName;
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
+ initialValue,
+ foldFunction,
+ foldResultType.createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator =
+ new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueProcessWindowFunction<>(windowFunction),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, windowResultType, operator);
+ }
+
// ------------------------------------------------------------------------
// Aggregation Function
// ------------------------------------------------------------------------
@@ -733,11 +943,53 @@ public class WindowedStream<T, K, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
-
- //clean the closure
+ String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
+ return apply(new InternalIterableWindowFunction<>(function), resultType, callLocation);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of incremental aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, ProcessWindowFunction.class, true, true, getInputType(), null, false);
+
+ return process(function, resultType);
+ }
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of incremental aggregation.
+ *
+ * @param function The window function.
+ * @param resultType Type information for the result type of the window function
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
+ function = input.getExecutionEnvironment().clean(function);
+ return apply(new InternalIterableProcessWindowFunction<>(function), resultType, callLocation);
+ }
+
+ private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, String callLocation) {
+
String udfName = "WindowedStream." + callLocation;
SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
@@ -767,7 +1019,7 @@ public class WindowedStream<T, K, W extends Window> {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
- new InternalIterableWindowFunction<>(function),
+ function,
trigger,
evictor,
allowedLateness);
@@ -784,7 +1036,7 @@ public class WindowedStream<T, K, W extends Window> {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
- new InternalIterableWindowFunction<>(function),
+ function,
trigger,
allowedLateness,
legacyWindowOpType);
@@ -1211,7 +1463,7 @@ public class WindowedStream<T, K, W extends Window> {
}
private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
- Function function,
+ ReduceFunction<?> function,
TypeInformation<R> resultType,
String functionName) {
@@ -1222,30 +1474,18 @@ public class WindowedStream<T, K, W extends Window> {
String opName = "Fast " + timeWindows + " of " + functionName;
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof WindowFunction) {
- @SuppressWarnings("unchecked")
- WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+ new AggregatingProcessingTimeWindowOperator<>(
+ reducer, input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+
} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
@@ -1253,36 +1493,69 @@ public class WindowedStream<T, K, W extends Window> {
String opName = "Fast " + timeWindows + " of " + functionName;
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer,
- input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof WindowFunction) {
- @SuppressWarnings("unchecked")
- WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+ new AggregatingProcessingTimeWindowOperator<>(
+ reducer,
+ input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+
+ return null;
+ }
+
+ private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
+ InternalWindowFunction<Iterable<T>, R, K, W> function,
+ TypeInformation<R> resultType,
+ String functionName) {
+
+ if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
+ SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
+ final long windowLength = timeWindows.getSize();
+ final long windowSlide = timeWindows.getSlide();
+
+ String opName = "Fast " + timeWindows + " of " + functionName;
+
+ @SuppressWarnings("unchecked")
+ InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
+ (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+
+ OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+ timeWindowFunction, input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
+ TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
+ final long windowLength = timeWindows.getSize();
+ final long windowSlide = timeWindows.getSize();
+
+ String opName = "Fast " + timeWindows + " of " + functionName;
+
+ @SuppressWarnings("unchecked")
+ InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
+ (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+
+
+ OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+ timeWindowFunction, input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
}
return null;
}
+
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
new file mode 100644
index 0000000..e1bc759
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
+ extends RichProcessWindowFunction<T, R, K, W>
+ implements OutputTypeConfigurable<R> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FoldFunction<T, ACC> foldFunction;
+ private final ProcessWindowFunction<ACC, R, K, W> windowFunction;
+
+ private byte[] serializedInitialValue;
+ private TypeSerializer<ACC> accSerializer;
+ private final TypeInformation<ACC> accTypeInformation;
+ private transient ACC initialValue;
+
+ public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
+ this.windowFunction = windowFunction;
+ this.foldFunction = foldFunction;
+ this.initialValue = initialValue;
+ this.accTypeInformation = accTypeInformation;
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ FunctionUtils.openFunction(this.windowFunction, configuration);
+
+ if (serializedInitialValue == null) {
+ throw new RuntimeException("No initial value was serialized for the fold " +
+ "window function. Probably the setOutputType method was not called.");
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+ DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+ initialValue = accSerializer.deserialize(in);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(this.windowFunction);
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ super.setRuntimeContext(t);
+
+ FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+ }
+
+ @Override
+ public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+ ACC result = accSerializer.copy(initialValue);
+
+ for (T val : values) {
+ result = foldFunction.fold(result, val);
+ }
+
+ windowFunction.process(key, windowFunction.new Context() {
+ @Override
+ public W window() {
+ return context.window();
+ }
+ }, Collections.singletonList(result), out);
+ }
+
+ @Override
+ public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
+ accSerializer = accTypeInformation.createSerializer(executionConfig);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+
+ try {
+ accSerializer.serialize(initialValue, out);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to serialize initial value of type " +
+ initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
+ }
+
+ serializedInitialValue = baos.toByteArray();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
new file mode 100644
index 0000000..9c48e24
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
+
+ /**
+ * The context holding window metadata
+ */
+ public abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ public abstract W window();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
new file mode 100644
index 0000000..9ea1fdf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
+ extends RichProcessWindowFunction<T, R, K, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+ private final ProcessWindowFunction<T, R, K, W> windowFunction;
+
+ public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
+ this.windowFunction = windowFunction;
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void process(K k, final Context context, Iterable<T> input, Collector<R> out) throws Exception {
+
+ T curr = null;
+ for (T val: input) {
+ if (curr == null) {
+ curr = val;
+ } else {
+ curr = reduceFunction.reduce(curr, val);
+ }
+ }
+ windowFunction.process(k, windowFunction.new Context() {
+ @Override
+ public W window() {
+ return context.window();
+ }
+ }, Collections.singletonList(curr), out);
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ FunctionUtils.openFunction(this.windowFunction, configuration);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(this.windowFunction);
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ super.setRuntimeContext(t);
+
+ FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
new file mode 100644
index 0000000..ac55bc6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
+ extends ProcessWindowFunction<IN, OUT, KEY, W>
+ implements RichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime context access
+ // --------------------------------------------------------------------------------------------
+
+ private transient RuntimeContext runtimeContext;
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ this.runtimeContext = t;
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ if (this.runtimeContext != null) {
+ return this.runtimeContext;
+ } else {
+ throw new IllegalStateException("The runtime context has not been initialized.");
+ }
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ if (this.runtimeContext == null) {
+ throw new IllegalStateException("The runtime context has not been initialized.");
+ } else if (this.runtimeContext instanceof IterationRuntimeContext) {
+ return (IterationRuntimeContext) this.runtimeContext;
+ } else {
+ throw new IllegalStateException("This stub is not part of an iteration step function.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index a252ece..87c5aca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
- private final WindowFunction<Type, Result, Key, Window> function;
+ private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
/**
* IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
@@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
// ------------------------------------------------------------------------
- public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
+ public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
this.keySelector = keySelector;
this.function = function;
}
@@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
}
@Override
- public void evaluateWindow(Collector<Result> out, TimeWindow window,
+ public void evaluateWindow(Collector<Result> out, final TimeWindow window,
AbstractStreamOperator<Result> operator) throws Exception
{
if (previousPanes.isEmpty()) {
@@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
- private final WindowFunction<Type, Result, Key, Window> function;
+ private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
private final UnionIterator<Type> unionIterator;
@@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private Key currentKey;
- WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window,
+ WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
this.function = function;
this.out = out;
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 7adaf13..094b34d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -23,22 +23,22 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import java.util.ArrayList;
@Internal
@Deprecated
public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> {
private static final long serialVersionUID = 7305948082830843475L;
-
+
public AccumulatingProcessingTimeWindowOperator(
- WindowFunction<IN, OUT, KEY, TimeWindow> function,
+ InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
TypeSerializer<IN> valueSerializer,
@@ -46,14 +46,14 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
long windowSlide)
{
super(function, keySelector, keySerializer,
- new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
+ new ArrayListSerializer<>(valueSerializer), windowLength, windowSlide);
}
@Override
protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
@SuppressWarnings("unchecked")
- WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-
+ InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
+
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
new file mode 100644
index 0000000..de516a5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends Window>
+ extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+ implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+ ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+
+ wrappedFunction.process(key, context, input, out);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
new file mode 100644
index 0000000..b28c208
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window>
+ extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+ implements InternalWindowFunction<IN, OUT, KEY, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+ ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+ ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+
+ wrappedFunction.process(key, context, Collections.singletonList(input), out);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
new file mode 100644
index 0000000..af5c77a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FoldApplyProcessWindowFunctionTest {
+
+ /**
+ * Tests that the FoldWindowFunction gets the output type serializer set by the
+ * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result.
+ */
+ @Test
+ public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{
+ StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+
+ List<StreamTransformation<?>> transformations = new ArrayList<>();
+
+ int initValue = 1;
+
+ FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>(
+ initValue,
+ new FoldFunction<Integer, Integer>() {
+ @Override
+ public Integer fold(Integer accumulator, Integer value) throws Exception {
+ return accumulator + value;
+ }
+
+ },
+ new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+ @Override
+ public void process(Integer integer,
+ Context context,
+ Iterable<Integer> input,
+ Collector<Integer> out) throws Exception {
+ for (Integer in: input) {
+ out.collect(in);
+ }
+ }
+ },
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+ new InternalIterableProcessWindowFunction<>(foldWindowFunction),
+ new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = -7951310554369722809L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ },
+ IntSerializer.INSTANCE,
+ IntSerializer.INSTANCE,
+ 3000,
+ 3000
+ );
+
+ SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
+
+ private static final long serialVersionUID = 8297735565464653028L;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ };
+
+ SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1);
+
+ transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+ StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+
+ List<Integer> result = new ArrayList<>();
+ List<Integer> input = new ArrayList<>();
+ List<Integer> expected = new ArrayList<>();
+
+ input.add(1);
+ input.add(2);
+ input.add(3);
+
+ for (int value : input) {
+ initValue += value;
+ }
+
+ expected.add(initValue);
+
+ foldWindowFunction.process(0, foldWindowFunction.new Context() {
+ @Override
+ public TimeWindow window() {
+ return new TimeWindow(0, 1);
+ }
+ }, input, new ListCollector<>(result));
+
+ Assert.assertEquals(expected, result);
+ }
+
+ public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 91ec427..fecd440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.Assert;
@@ -81,19 +82,20 @@ public class FoldApplyWindowFunctionTest {
);
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
- foldWindowFunction,
- new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = -7951310554369722809L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- },
- IntSerializer.INSTANCE,
- IntSerializer.INSTANCE,
- 3000,
- 3000
+ new InternalIterableWindowFunction<>(
+ foldWindowFunction),
+ new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = -7951310554369722809L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ },
+ IntSerializer.INSTANCE,
+ IntSerializer.INSTANCE,
+ 3000,
+ 3000
);
SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index f3c3423..3c73035 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.util.Collector;
import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -115,7 +118,48 @@ public class InternalWindowFunctionTest {
Collector<String> c = (Collector<String>) mock(Collector.class);
windowFunction.apply(42L, w, i, c);
- verify(mock).apply(42L, w, i, c);
+ verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
+
+ // check close
+ windowFunction.close();
+ verify(mock).close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testInternalIterableProcessWindowFunction() throws Exception {
+
+ ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
+ InternalIterableProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+ new InternalIterableProcessWindowFunction<>(mock);
+
+ // check setOutputType
+ TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(42);
+
+ StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+ verify(mock).setOutputType(stringType, execConf);
+
+ // check open
+ Configuration config = new Configuration();
+
+ windowFunction.open(config);
+ verify(mock).open(config);
+
+ // check setRuntimeContext
+ RuntimeContext rCtx = mock(RuntimeContext.class);
+
+ windowFunction.setRuntimeContext(rCtx);
+ verify(mock).setRuntimeContext(rCtx);
+
+ // check apply
+ TimeWindow w = mock(TimeWindow.class);
+ Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+ Collector<String> c = (Collector<String>) mock(Collector.class);
+
+ windowFunction.apply(42L, w, i, c);
+ verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
// check close
windowFunction.close();
@@ -204,6 +248,59 @@ public class InternalWindowFunctionTest {
verify(mock).close();
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testInternalSingleValueProcessWindowFunction() throws Exception {
+
+ ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
+ InternalSingleValueProcessWindowFunction<Long, String, Long, TimeWindow> windowFunction =
+ new InternalSingleValueProcessWindowFunction<>(mock);
+
+ // check setOutputType
+ TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(42);
+
+ StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+ verify(mock).setOutputType(stringType, execConf);
+
+ // check open
+ Configuration config = new Configuration();
+
+ windowFunction.open(config);
+ verify(mock).open(config);
+
+ // check setRuntimeContext
+ RuntimeContext rCtx = mock(RuntimeContext.class);
+
+ windowFunction.setRuntimeContext(rCtx);
+ verify(mock).setRuntimeContext(rCtx);
+
+ // check apply
+ TimeWindow w = mock(TimeWindow.class);
+ Collector<String> c = (Collector<String>) mock(Collector.class);
+
+ windowFunction.apply(42L, w, 23L, c);
+ verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+ // check close
+ windowFunction.close();
+ verify(mock).close();
+ }
+
+ public static class ProcessWindowFunctionMock
+ extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
+ implements OutputTypeConfigurable<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+ @Override
+ public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+ }
+
public static class WindowFunctionMock
extends RichWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
@@ -214,7 +311,7 @@ public class InternalWindowFunctionTest {
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
@Override
- public void apply(Long aLong, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception { }
+ public void apply(Long aLong, TimeWindow w, Iterable<Long> input, Collector<String> out) throws Exception { }
}
public static class AllWindowFunctionMock
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 255a20f..508d2e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,8 +36,12 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +51,9 @@ import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,10 +71,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings({"serial"})
+@PrepareForTest(InternalIterableWindowFunction.class)
+@RunWith(PowerMockRunner.class)
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
- private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
+ private final InternalIterableWindowFunction<String, String, String, TimeWindow> mockFunction = mock(InternalIterableWindowFunction.class);
@SuppressWarnings("unchecked")
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -79,26 +88,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
};
- private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
- new WindowFunction<Integer, Integer, Integer, TimeWindow>()
- {
- @Override
- public void apply(Integer key,
- TimeWindow window,
- Iterable<Integer> values,
- Collector<Integer> out) {
- for (Integer val : values) {
- assertEquals(key, val);
- out.collect(val);
- }
- }
- };
+ private final InternalIterableWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+ new InternalIterableWindowFunction<>(new WindowFunction<Integer, Integer, Integer, TimeWindow>() {
+ @Override
+ public void apply(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ for (Integer val : values) {
+ assertEquals(key, val);
+ out.collect(val);
+ }
+ }
+ });
+
+ private final InternalIterableProcessWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityProcessFunction =
+ new InternalIterableProcessWindowFunction<>(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+ @Override
+ public void process(Integer key, Context context, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ for (Integer val : values) {
+ assertEquals(key, val);
+ out.collect(val);
+ }
+ }
+ });
// ------------------------------------------------------------------------
public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
ClosureCleaner.clean(identitySelector, false);
ClosureCleaner.clean(validatingIdentityFunction, false);
+ ClosureCleaner.clean(validatingIdentityProcessFunction, false);
}
// ------------------------------------------------------------------------
@@ -281,6 +298,50 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Test
+ public void testTumblingWindowWithProcessFunction() throws Exception {
+ try {
+ final int windowSize = 50;
+
+ // tumbling window that triggers every 20 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ windowSize, windowSize);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ final int numElements = 1000;
+
+ long currentTime = 0;
+
+ for (int i = 0; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
+
+
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+ assertEquals(numElements, result.size());
+
+ Collections.sort(result);
+ for (int i = 0; i < numElements; i++) {
+ assertEquals(i, result.get(i).intValue());
+ }
+
+ testHarness.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testSlidingWindow() throws Exception {
// tumbling window that triggers every 20 milliseconds
@@ -333,6 +394,58 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Test
+ public void testSlidingWindowWithProcessFunction() throws Exception {
+
+ // tumbling window that triggers every 20 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ final int numElements = 1000;
+
+ long currentTime = 0;
+
+ for (int i = 0; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ currentTime = currentTime + 10;
+ testHarness.setProcessingTime(currentTime);
+ }
+
+ // get and verify the result
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+ // if we kept this running, each element would be in the result three times (for each slide).
+ // we are closing the window before the final panes are through three times, so we may have less
+ // elements.
+ if (result.size() < numElements || result.size() > 3 * numElements) {
+ fail("Wrong number of results: " + result.size());
+ }
+
+ Collections.sort(result);
+ int lastNum = -1;
+ int lastCount = -1;
+
+ for (int num : result) {
+ if (num == lastNum) {
+ lastCount++;
+ assertTrue(lastCount <= 3);
+ }
+ else {
+ lastNum = num;
+ lastCount = 1;
+ }
+ }
+
+ testHarness.close();
+ }
+
+ @Test
public void testTumblingWindowSingleElements() throws Exception {
try {
@@ -379,7 +492,55 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
fail(e.getMessage());
}
}
-
+
+ @Test
+ public void testTumblingWindowSingleElementsWithProcessFunction() throws Exception {
+
+ try {
+
+ // tumbling window that triggers every 20 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ testHarness.setProcessingTime(0);
+
+ testHarness.processElement(new StreamRecord<>(1));
+ testHarness.processElement(new StreamRecord<>(2));
+
+ testHarness.setProcessingTime(50);
+
+ testHarness.processElement(new StreamRecord<>(3));
+ testHarness.processElement(new StreamRecord<>(4));
+ testHarness.processElement(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(100);
+
+ testHarness.processElement(new StreamRecord<>(6));
+
+ testHarness.setProcessingTime(200);
+
+
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+ assertEquals(6, result.size());
+
+ Collections.sort(result);
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+ testHarness.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void testSlidingWindowSingleElements() throws Exception {
try {
@@ -420,6 +581,126 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Test
+ public void testSlidingWindowSingleElementsWithProcessFunction() throws Exception {
+ try {
+
+ // tumbling window that triggers every 20 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setProcessingTime(0);
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(1));
+ testHarness.processElement(new StreamRecord<>(2));
+
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+
+ List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+ assertEquals(6, result.size());
+
+ Collections.sort(result);
+ assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+
+ testHarness.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void checkpointRestoreWithPendingWindowTumblingWithProcessFunction() {
+ try {
+ final int windowSize = 200;
+
+ // tumbling window that triggers every 200 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ windowSize, windowSize);
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new OneInputStreamOperatorTestHarness<>(op);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(0);
+
+ // inject some elements
+ final int numElementsFirst = 700;
+ final int numElements = 1000;
+ for (int i = 0; i < numElementsFirst; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ }
+
+ // draw a snapshot and dispose the window
+ int beforeSnapShot = testHarness.getOutput().size();
+ StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+ List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+ int afterSnapShot = testHarness.getOutput().size();
+ assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+ assertTrue(afterSnapShot <= numElementsFirst);
+
+ // inject some random elements, which should not show up in the state
+ for (int i = 0; i < 300; i++) {
+ testHarness.processElement(new StreamRecord<>(i + numElementsFirst));
+ }
+
+ testHarness.close();
+ op.dispose();
+
+ // re-create the operator and restore the state
+ op = new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ windowSize, windowSize);
+
+ testHarness = new OneInputStreamOperatorTestHarness<>(op);
+
+ testHarness.setup();
+ testHarness.restore(state);
+ testHarness.open();
+
+ // inject some more elements
+ for (int i = numElementsFirst; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ }
+
+ testHarness.setProcessingTime(400);
+
+ // get and verify the result
+ List<Integer> finalResult = new ArrayList<>();
+ finalResult.addAll(resultAtSnapshot);
+ List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
+ finalResult.addAll(finalPartialResult);
+ assertEquals(numElements, finalResult.size());
+
+ Collections.sort(finalResult);
+ for (int i = 0; i < numElements; i++) {
+ assertEquals(i, finalResult.get(i).intValue());
+ }
+ testHarness.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void checkpointRestoreWithPendingWindowTumbling() {
try {
final int windowSize = 200;
@@ -501,6 +782,98 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Test
+ public void checkpointRestoreWithPendingWindowSlidingWithProcessFunction() {
+ try {
+ final int factor = 4;
+ final int windowSlide = 50;
+ final int windowSize = factor * windowSlide;
+
+ // sliding window (200 msecs) every 50 msecs
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ windowSize, windowSlide);
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new OneInputStreamOperatorTestHarness<>(op);
+
+ testHarness.setProcessingTime(0);
+
+ testHarness.setup();
+ testHarness.open();
+
+ // inject some elements
+ final int numElements = 1000;
+ final int numElementsFirst = 700;
+
+ for (int i = 0; i < numElementsFirst; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ }
+
+ // draw a snapshot
+ List<Integer> resultAtSnapshot = extractFromStreamRecords(testHarness.getOutput());
+ int beforeSnapShot = testHarness.getOutput().size();
+ StreamStateHandle state = testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+ int afterSnapShot = testHarness.getOutput().size();
+ assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+
+ assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
+
+ // inject the remaining elements - these should not influence the snapshot
+ for (int i = numElementsFirst; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ }
+
+ testHarness.close();
+
+ // re-create the operator and restore the state
+ op = new AccumulatingProcessingTimeWindowOperator<>(
+ validatingIdentityProcessFunction, identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ windowSize, windowSlide);
+
+ testHarness = new OneInputStreamOperatorTestHarness<>(op);
+
+ testHarness.setup();
+ testHarness.restore(state);
+ testHarness.open();
+
+
+ // inject again the remaining elements
+ for (int i = numElementsFirst; i < numElements; i++) {
+ testHarness.processElement(new StreamRecord<>(i));
+ }
+
+ testHarness.setProcessingTime(50);
+ testHarness.setProcessingTime(100);
+ testHarness.setProcessingTime(150);
+ testHarness.setProcessingTime(200);
+ testHarness.setProcessingTime(250);
+ testHarness.setProcessingTime(300);
+ testHarness.setProcessingTime(350);
+
+ // get and verify the result
+ List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+ List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
+ finalResult.addAll(finalPartialResult);
+ assertEquals(factor * numElements, finalResult.size());
+
+ Collections.sort(finalResult);
+ for (int i = 0; i < factor * numElements; i++) {
+ assertEquals(i / factor, finalResult.get(i).intValue());
+ }
+
+ testHarness.close();
+ op.dispose();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void checkpointRestoreWithPendingWindowSliding() {
try {
final int factor = 4;
@@ -601,8 +974,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
new AccumulatingProcessingTimeWindowOperator<>(
- new StatefulFunction(), identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+ new InternalIterableProcessWindowFunction<>(new StatefulFunction()),
+ identitySelector,
+ IntSerializer.INSTANCE,
+ IntSerializer.INSTANCE,
+ 50,
+ 50);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -661,7 +1038,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+ private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
// we use a concurrent map here even though there is no concurrency, to
// get "volatile" style access to entries
@@ -677,8 +1054,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Override
- public void apply(Integer key,
- TimeWindow window,
+ public void process(Integer key,
+ Context context,
Iterable<Integer> values,
Collector<Integer> out) throws Exception {
for (Integer i : values) {
[5/8] flink git commit: [FLINK-4997] [streaming] Introduce
ProcessWindowFunction
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 104bc7b..a9c3ef6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -59,7 +60,9 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -432,6 +435,78 @@ public class WindowOperatorTest extends TestLogger {
@Test
@SuppressWarnings("unchecked")
+ public void testSessionWindowsWithProcessFunction() throws Exception {
+ closeCalled.set(0);
+
+ final int SESSION_SIZE = 3;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+ EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
+ EventTimeTrigger.create(),
+ 0);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+
+ // do a snapshot, close and restore again
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+ testHarness.processWatermark(new Watermark(12000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+ expectedOutput.add(new Watermark(12000));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+ testHarness.processWatermark(new Watermark(17999));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+ expectedOutput.add(new Watermark(17999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
public void testReduceSessionWindows() throws Exception {
closeCalled.set(0);
@@ -500,6 +575,76 @@ public class WindowOperatorTest extends TestLogger {
testHarness.close();
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testReduceSessionWindowsWithProcessFunction() throws Exception {
+ closeCalled.set(0);
+
+ final int SESSION_SIZE = 3;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>(
+ "window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+ EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
+ EventTimeTrigger.create(),
+ 0);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+
+ // do a snapshot, close and restore again
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5501));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 6050));
+
+ testHarness.processWatermark(new Watermark(12000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), 9049));
+ expectedOutput.add(new Watermark(12000));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 15000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 20), 15000));
+
+ testHarness.processWatermark(new Watermark(17999));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-30", 15000L, 18000L), 17999));
+ expectedOutput.add(new Watermark(17999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+ testHarness.close();
+ }
+
/**
* This tests whether merging works correctly with the CountTrigger.
* @throws Exception
@@ -2379,6 +2524,38 @@ public class WindowOperatorTest extends TestLogger {
}
}
+ public static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context context,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, Long, Long>> out) throws Exception {
+ int sum = 0;
+ for (Tuple2<String, Integer> i: values) {
+ sum += i.f1;
+ }
+ String resultString = key + "-" + sum;
+ TimeWindow window = context.window();
+ out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
+ }
+ }
+
+ public static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context context,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, Long, Long>> out) throws Exception {
+ TimeWindow window = context.window();
+ for (Tuple2<String, Integer> val: values) {
+ out.collect(new Tuple3<>(key + "-" + val.f1, window.getStart(), window.getEnd()));
+ }
+ }
+ }
public static class PointSessionWindows extends EventTimeSessionWindows {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 1e3e3d5..7d37d1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -18,17 +18,22 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
@@ -114,6 +119,79 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}
@Test
+ public void testFoldProcessWindow() throws Exception {
+
+ testResults = new ArrayList<>();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+ ctx.collect(Tuple2.of("a", 2));
+
+ ctx.collect(Tuple2.of("b", 3));
+ ctx.collect(Tuple2.of("b", 4));
+ ctx.collect(Tuple2.of("b", 5));
+
+ ctx.collect(Tuple2.of("a", 6));
+ ctx.collect(Tuple2.of("a", 7));
+ ctx.collect(Tuple2.of("a", 8));
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ @Override
+ public void cancel() {}
+
+ }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+ @Override
+ public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
+ accumulator.f1 += value.f0;
+ accumulator.f0 += value.f1;
+ return accumulator;
+ }
+ }, new ProcessWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, Tuple, TimeWindow>() {
+ @Override
+ public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+ int i = 0;
+ for (Tuple2<Integer, String> in : elements) {
+ out.collect(new Tuple3<>(in.f1, in.f0, i++));
+ }
+ }
+ })
+ .addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() {
+ @Override
+ public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
+ testResults.add(value.toString());
+ }
+ });
+
+ env.execute("Fold Process Window Test");
+
+ List<String> expectedResult = Arrays.asList(
+ "(R:aaa,3,0)",
+ "(R:aaa,21,0)",
+ "(R:bbb,12,0)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ @Test
public void testFoldAllWindow() throws Exception {
testResults = new ArrayList<>();
[2/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window
Translation Tests
Posted by al...@apache.org.
[FLINK-5237] Consolidate and harmonize Window Translation Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5368a7d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5368a7d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5368a7d3
Branch: refs/heads/master
Commit: 5368a7d32d96beb1b8298b87d9ea6d42ea306947
Parents: fe2a301
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../operators/StateDescriptorPassingTest.java | 26 +
.../windowing/WindowTranslationTest.java | 718 +++++++++++++++--
.../ScalaProcessWindowFunctionWrapper.scala | 16 +-
.../api/scala/WindowTranslationTest.scala | 766 +++++++++++++++++--
4 files changed, 1420 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 26cb7ac..813ca96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -136,6 +137,31 @@ public class StateDescriptorPassingTest {
}
@Test
+ public void testProcessWindowState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .keyBy(new KeySelector<File, String>() {
+ @Override
+ public String getKey(File value) {
+ return null;
+ }
+ })
+ .timeWindow(Time.milliseconds(1000))
+ .process(new ProcessWindowFunction<File, String, String, TimeWindow>() {
+ @Override
+ public void process(String s, Context ctx,
+ Iterable<File> input, Collector<String> out) {}
+ });
+
+ validateListStateDescriptorConfigured(result);
+ }
+
+ @Test
public void testFoldWindowAllState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index f72a2f1..b899948 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
@@ -112,7 +115,7 @@ public class WindowTranslationTest {
* in a {@code AggregatingState}.
*/
@Test(expected = UnsupportedOperationException.class)
- public void testAgrgegateWithRichFunctionFails() throws Exception {
+ public void testAggregateWithRichFunctionFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -405,6 +408,82 @@ public class WindowTranslationTest {
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithProcesWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(reducer, new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String tuple,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
/**
* Test for the deprecated .apply(Reducer, WindowFunction).
*/
@@ -447,6 +526,50 @@ public class WindowTranslationTest {
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ /**
+ * Test for the deprecated .apply(Reducer, WindowFunction).
+ */
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreReducerAndEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .evictor(CountEvictor.of(100))
+ .apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
// ------------------------------------------------------------------------
// Aggregate Translation Tests
// ------------------------------------------------------------------------
@@ -463,13 +586,13 @@ public class WindowTranslationTest {
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -492,13 +615,13 @@ public class WindowTranslationTest {
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.aggregate(new DummyAggregationFunction());
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
@@ -529,7 +652,7 @@ public class WindowTranslationTest {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
- WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -569,6 +692,66 @@ public class WindowTranslationTest {
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ public void testAggregateWithProcessWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+ processElementAndEnsureOutput(
+ operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+ processElementAndEnsureOutput(
+ operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
// ------------------------------------------------------------------------
// Fold Translation Tests
// ------------------------------------------------------------------------
@@ -664,83 +847,269 @@ public class WindowTranslationTest {
@SuppressWarnings("rawtypes")
public void testFoldWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithProcessWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreFolderEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyWithPreFolderAndEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .evictor(CountEvictor.of(100))
+ .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(String key,
+ TimeWindow window,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Apply Translation Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- DataStream<Tuple2<String, Integer>> window = source
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
- .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(String key,
TimeWindow window,
- Iterable<Tuple3<String, String, Integer>> values,
+ Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
- for (Tuple3<String, String, Integer> in : values) {
- out.collect(new Tuple2<>(in.f0, in.f2));
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
}
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
- (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
- Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
- Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
- Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
@Test
@SuppressWarnings("rawtypes")
- public void testApplyWithPreFolderEventTime() throws Exception {
+ public void testApplyProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- DataStream<Tuple3<String, String, Integer>> window = source
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
- .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(String key,
TimeWindow window,
- Iterable<Tuple3<String, String, Integer>> values,
- Collector<Tuple3<String, String, Integer>> out) throws Exception {
- for (Tuple3<String, String, Integer> in : values) {
- out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
}
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
- (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
- Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
- }
- // ------------------------------------------------------------------------
- // Apply Translation Tests
- // ------------------------------------------------------------------------
+ }
@Test
@SuppressWarnings("rawtypes")
- public void testApplyEventTime() throws Exception {
+ public void testProcessEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -749,12 +1118,12 @@ public class WindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void apply(String key,
- TimeWindow window,
+ public void process(String key,
+ Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
@@ -776,7 +1145,7 @@ public class WindowTranslationTest {
@Test
@SuppressWarnings("rawtypes")
- public void testApplyProcessingTimeTime() throws Exception {
+ public void testProcessProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
@@ -785,12 +1154,12 @@ public class WindowTranslationTest {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void apply(String key,
- TimeWindow window,
+ public void process(String key,
+ Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
@@ -903,6 +1272,43 @@ public class WindowTranslationTest {
@Test
@SuppressWarnings("rawtypes")
+ public void testProcessWithCustomTrigger() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
public void testReduceWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -930,6 +1336,121 @@ public class WindowTranslationTest {
}
@Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .reduce(
+ reducer,
+ new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ @Override
+ public void process(
+ Tuple tuple,
+ Context context,
+ Iterable<Tuple2<String, Integer>> elements,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(new DummyAggregationFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(
+ winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(
+ new DummyAggregationFunction(),
+ new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ @Override
+ public void process(
+ String s,
+ Context context,
+ Iterable<Tuple2<String, Integer>> elements,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(
+ winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
+ @Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFoldWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -958,6 +1479,48 @@ public class WindowTranslationTest {
}
@Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testFoldWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .fold(
+ new Tuple3<>("", "", 1),
+ new DummyFolder(),
+ new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, Tuple, TimeWindow>() {
+ @Override
+ public void process(
+ Tuple tuple,
+ Context context,
+ Iterable<Tuple3<String, String, Integer>> elements,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
@SuppressWarnings("rawtypes")
public void testApplyWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -996,6 +1559,45 @@ public class WindowTranslationTest {
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testProcessWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(new TupleKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+ .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
/**
* Ensure that we get some output from the given operator when pushing in an element and
* setting watermark and processing time to {@code Long.MAX_VALUE}.
@@ -1012,6 +1614,12 @@ public class WindowTranslationTest {
keySelector,
keyType);
+ if (operator instanceof OutputTypeConfigurable) {
+ // use a dummy type since window functions just need the ExecutionConfig
+ // this is also only needed for Fold, which we're getting rid off soon.
+ ((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+ }
+
testHarness.open();
testHarness.setProcessingTime(0);
@@ -1050,7 +1658,7 @@ public class WindowTranslationTest {
}
}
- private static class DummyAggregationFunction
+ private static class DummyAggregationFunction
implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
@@ -1096,7 +1704,7 @@ public class WindowTranslationTest {
}
}
- private static class TestWindowFunction
+ private static class TestWindowFunction
implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
@Override
@@ -1111,6 +1719,22 @@ public class WindowTranslationTest {
}
}
+ private static class TestProcessWindowFunction
+ extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
+
+ @Override
+ public void process(String key,
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ }
+
+
private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 4a20371..23293a6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.api.scala.function.util
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
-import org.apache.flink.api.java.operators.translation.WrappingFunction
import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
@@ -37,8 +35,7 @@ import scala.collection.JavaConverters._
*/
final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
- extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
- with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+ extends JProcessWindowFunction[IN, OUT, KEY, W] {
override def process(
key: KEY,
@@ -50,15 +47,4 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
}
func.process(key, ctx, elements.asScala, out)
}
-
- override def getRuntimeContext: RuntimeContext = {
- throw new RuntimeException("This should never be called")
- }
-
- override def getIterationRuntimeContext: IterationRuntimeContext = {
- throw new RuntimeException("This should never be called")
- }
}
-
-private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
- extends JProcessWindowFunction[IN, OUT, KEY, W]
[3/8] flink git commit: [FLINK-4997] Add doc for ProcessWindowFunction
Posted by al...@apache.org.
[FLINK-4997] Add doc for ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f047e13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f047e13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f047e13
Branch: refs/heads/master
Commit: 4f047e13518ad2eb493903179e38eb174a37994c
Parents: 86dff0e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 9 11:56:46 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
docs/dev/windows.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4f047e13/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index f8be08f..73f348e 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -565,6 +565,108 @@ The example shows a `WindowFunction` to count the elements in a window. In addit
<span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`.
+### ProcessWindowFunction
+
+In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This
+is very similar to `WindowFunction`, except that the interface allows to query more information
+about the context in which the window evaluation happens.
+
+This is the `ProcessWindowFunction` interface:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public abstract void process(
+ KEY key,
+ Context context,
+ Iterable<IN> elements,
+ Collector<OUT> out) throws Exception;
+
+ /**
+ * The context holding window metadata
+ */
+ public abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ public abstract W window();
+ }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def process(
+ key: KEY,
+ context: Context,
+ elements: Iterable[IN],
+ out: Collector[OUT])
+
+ /**
+ * The context holding window metadata
+ */
+ abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ def window: W
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It can be used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .process(new MyProcessWindowFunction());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .process(new MyProcessWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
### WindowFunction with Incremental Aggregation
A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
@@ -573,6 +675,9 @@ When the window is closed, the `WindowFunction` will be provided with the aggreg
This allows to incrementally compute windows while having access to the
additional window meta information of the `WindowFunction`.
+<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of
+`WindowFunction` for incremental window aggregation.
+
#### Incremental Window Aggregation with FoldFunction
The following example shows how an incremental `FoldFunction` can be combined with
[7/8] flink git commit: [FLINK-4997] Add ProcessWindowFunction
support for .aggregate()
Posted by al...@apache.org.
[FLINK-4997] Add ProcessWindowFunction support for .aggregate()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe2a3016
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe2a3016
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe2a3016
Branch: refs/heads/master
Commit: fe2a3016f98e45d0c94a3fa1ed8c17b89a516859
Parents: 4f047e1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Feb 7 14:38:25 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedStream.java | 128 +++++++++++++++++++
.../InternalAggregateProcessWindowFunction.java | 84 ++++++++++++
.../functions/InternalWindowFunctionTest.java | 104 +++++++++++++++
.../streaming/api/scala/WindowedStream.scala | 30 +++++
4 files changed, 346 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 45eaae5..6809df0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -64,6 +64,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
@@ -906,6 +907,133 @@ public class WindowedStream<T, K, W extends Window> {
return input.transform(opName, resultType, operator);
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+ * that the window function typically has only a single value to process when called.
+ *
+ * @param aggFunction The aggregate function that is used for incremental aggregation.
+ * @param windowFunction The window function.
+ *
+ * @return The data stream that is the result of applying the window function to the window.
+ *
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+ * @param <R> The type of the elements in the resulting stream, equal to the
+ * WindowFunction's result type
+ */
+ @PublicEvolving
+ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, V> aggFunction,
+ ProcessWindowFunction<V, R, K, W> windowFunction) {
+
+ checkNotNull(aggFunction, "aggFunction");
+ checkNotNull(windowFunction, "windowFunction");
+
+ TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+ aggFunction, input.getType(), null, false);
+
+ TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
+ aggFunction, input.getType(), null, false);
+
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
+
+ return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+ * that the window function typically has only a single value to process when called.
+ *
+ * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+ * @param windowFunction The window function.
+ * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+ * @param resultType Type information for the result type of the window function
+ *
+ * @return The data stream that is the result of applying the window function to the window.
+ *
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+ * @param <R> The type of the elements in the resulting stream, equal to the
+ * WindowFunction's result type
+ */
+ @PublicEvolving
+ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, V> aggregateFunction,
+ ProcessWindowFunction<V, R, K, W> windowFunction,
+ TypeInformation<ACC> accumulatorType,
+ TypeInformation<V> aggregateResultType,
+ TypeInformation<R> resultType) {
+
+ checkNotNull(aggregateFunction, "aggregateFunction");
+ checkNotNull(windowFunction, "windowFunction");
+ checkNotNull(accumulatorType, "accumulatorType");
+ checkNotNull(aggregateResultType, "aggregateResultType");
+ checkNotNull(resultType, "resultType");
+
+ if (aggregateFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+ }
+
+ //clean the closures
+ windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+ aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "WindowedStream." + callLocation;
+
+ String opName;
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalAggregateProcessWindowFunction<>(aggregateFunction, windowFunction),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
+ aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator = new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueProcessWindowFunction<>(windowFunction),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, resultType, operator);
+ }
+
// ------------------------------------------------------------------------
// Window Function (apply)
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
new file mode 100644
index 0000000..433da9b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an
+ * {@code Iterable} and an {@link AggregateFunction}.
+ *
+ * @param <K> The key type
+ * @param <W> The window type
+ * @param <T> The type of the input to the AggregateFunction
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of the AggregateFunction's result, and the input to the WindowFunction
+ * @param <R> The result type of the WindowFunction
+ */
+public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W extends Window>
+ extends WrappingFunction<ProcessWindowFunction<V, R, K, W>>
+ implements InternalWindowFunction<Iterable<T>, R, K, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AggregateFunction<T, ACC, V> aggFunction;
+
+ public InternalAggregateProcessWindowFunction(
+ AggregateFunction<T, ACC, V> aggFunction,
+ ProcessWindowFunction<V, R, K, W> windowFunction) {
+ super(windowFunction);
+ this.aggFunction = aggFunction;
+ }
+
+ @Override
+ public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+ ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+ ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+
+ final ACC acc = aggFunction.createAccumulator();
+
+ for (T val : input) {
+ aggFunction.add(val, acc);
+ }
+
+ wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 3c73035..e49a496 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
@@ -39,6 +41,17 @@ import org.apache.flink.util.Collector;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.hamcrest.core.AllOf.allOf;
import static org.mockito.Mockito.*;
public class InternalWindowFunctionTest {
@@ -288,6 +301,84 @@ public class InternalWindowFunctionTest {
verify(mock).close();
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testInternalAggregateProcessWindowFunction() throws Exception {
+
+ AggregateProcessWindowFunctionMock mock = mock(AggregateProcessWindowFunctionMock.class);
+
+ InternalAggregateProcessWindowFunction<Long, Set<Long>, Map<Long, Long>, String, Long, TimeWindow> windowFunction =
+ new InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Set<Long> createAccumulator() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public void add(Long value, Set<Long> accumulator) {
+ accumulator.add(value);
+ }
+
+ @Override
+ public Map<Long, Long> getResult(Set<Long> accumulator) {
+ Map<Long, Long> result = new HashMap<>();
+ for (Long in : accumulator) {
+ result.put(in, in);
+ }
+ return result;
+ }
+
+ @Override
+ public Set<Long> merge(Set<Long> a, Set<Long> b) {
+ a.addAll(b);
+ return a;
+ }
+ }, mock);
+
+ // check setOutputType
+ TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(42);
+
+ StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+ verify(mock).setOutputType(stringType, execConf);
+
+ // check open
+ Configuration config = new Configuration();
+
+ windowFunction.open(config);
+ verify(mock).open(config);
+
+ // check setRuntimeContext
+ RuntimeContext rCtx = mock(RuntimeContext.class);
+
+ windowFunction.setRuntimeContext(rCtx);
+ verify(mock).setRuntimeContext(rCtx);
+
+ // check apply
+ TimeWindow w = mock(TimeWindow.class);
+ Collector<String> c = (Collector<String>) mock(Collector.class);
+
+ List<Long> args = new LinkedList<>();
+ args.add(23L);
+ args.add(24L);
+
+ windowFunction.apply(42L, w, args, c);
+ verify(mock).process(
+ eq(42L),
+ (AggregateProcessWindowFunctionMock.Context) anyObject(),
+ (Iterable) argThat(containsInAnyOrder(allOf(
+ hasEntry(is(23L), is(23L)),
+ hasEntry(is(24L), is(24L))))),
+ eq(c));
+
+ // check close
+ windowFunction.close();
+ verify(mock).close();
+ }
+
public static class ProcessWindowFunctionMock
extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
@@ -301,6 +392,19 @@ public class InternalWindowFunctionTest {
public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
}
+ public static class AggregateProcessWindowFunctionMock
+ extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
+ implements OutputTypeConfigurable<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+ @Override
+ public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+ }
+
public static class WindowFunctionMock
extends RichWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 96ff334..a5fbeb9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -326,6 +326,36 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
accumulatorType, aggregationResultType, resultType))
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given aggregation function.
+ *
+ * @param preAggregator The aggregation function that is used for pre-aggregation
+ * @param windowFunction The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
+ (preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
+
+ val cleanedPreAggregator = clean(preAggregator)
+ val cleanedWindowFunction = clean(windowFunction)
+
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[V, R, K, W](cleanedWindowFunction)
+
+ val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+ val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
+ val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+
+ asScalaStream(javaStream.aggregate(
+ cleanedPreAggregator, applyFunction,
+ accumulatorType, aggregationResultType, resultType))
+ }
+
+
// ---------------------------- fold() ------------------------------------
/**
[8/8] flink git commit: [FLINK-4997] [streaming] Add
ProcessWindowFunction to Scala API
Posted by al...@apache.org.
[FLINK-4997] [streaming] Add ProcessWindowFunction to Scala API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86dff0e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86dff0e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86dff0e6
Branch: refs/heads/master
Commit: 86dff0e6d584027994dd1320845169cc8b1a83d5
Parents: 1dcb2dc
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Nov 9 10:49:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100
----------------------------------------------------------------------
.../streaming/api/scala/WindowedStream.scala | 144 ++++++++++++++++++-
.../scala/function/ProcessWindowFunction.scala | 61 ++++++++
.../function/RichProcessWindowFunction.scala | 87 +++++++++++
.../ScalaProcessWindowFunctionWrapper.scala | 64 +++++++++
.../streaming/api/scala/WindowFoldITCase.scala | 64 ++++++++-
.../api/scala/WindowFunctionITCase.scala | 54 ++++++-
.../api/scala/WindowReduceITCase.scala | 64 ++++++++-
...ckingIdentityRichProcessWindowFunction.scala | 81 +++++++++++
8 files changed, 605 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index ab27820..96ff334 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -18,14 +18,16 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.scala.function.util._
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -99,7 +101,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
// ------------------------------------------------------------------------
// --------------------------- reduce() -----------------------------------
-
+
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
@@ -198,10 +200,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
asScalaStream(javaStream.reduce(reducer, applyFunction, implicitly[TypeInformation[R]]))
}
+
+ /**
+ * Applies the given reduce function to each window. The window reduced value is
+ * then passed as input of the window function. The output of the window function
+ * is interpreted as a regular non-windowed stream.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def reduce[R: TypeInformation](
+ preAggregator: (T, T) => T,
+ function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+ val cleanedPreAggregator = clean(preAggregator)
+ val cleanedWindowFunction = clean(function)
+
+ val reducer = new ScalaReduceFunction[T](cleanedPreAggregator)
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+ val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.reduce(reducer, applyFunction, resultType))
+ }
+
+ /**
+ * Applies the given reduce function to each window. The window reduced value is
+ * then passed as input of the window function. The output of the window function
+ * is interpreted as a regular non-windowed stream.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def reduce[R: TypeInformation](
+ preAggregator: ReduceFunction[T],
+ function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+ val cleanedPreAggregator = clean(preAggregator)
+ val cleanedWindowFunction = clean(function)
+
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
+
+ val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, resultType))
+ }
+
// -------------------------- aggregate() ---------------------------------
/**
- * Applies the given aggregation function to each window and key. The aggregation function
+ * Applies the given aggregation function to each window and key. The aggregation function
* is called for each element, aggregating values incrementally and keeping the state to
* one accumulator per key and window.
*
@@ -213,7 +263,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-
+
asScalaStream(javaStream.aggregate(
clean(aggregateFunction), accumulatorType, resultType))
}
@@ -241,7 +291,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-
+
asScalaStream(javaStream.aggregate(
cleanedPreAggregator, applyFunction,
accumulatorType, aggregationResultType, resultType))
@@ -277,7 +327,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
}
// ---------------------------- fold() ------------------------------------
-
+
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
@@ -379,9 +429,89 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, resultType))
}
+
+ /**
+ * Applies the given fold function to each window. The window folded value is
+ * then passed as input of the process window function.
+ * The output of the process window function is interpreted as a regular non-windowed stream.
+ *
+ * @param initialValue The initial value of the fold
+ * @param foldFunction The fold function that is used for incremental aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def fold[R: TypeInformation, ACC: TypeInformation](
+ initialValue: ACC,
+ foldFunction: (ACC, T) => ACC,
+ function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+ val cleanedFunction = clean(function)
+ val cleanedFoldFunction = clean(foldFunction)
+
+ val folder = new ScalaFoldFunction[T, ACC](cleanedFoldFunction)
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+ asScalaStream(javaStream.fold(
+ initialValue,
+ folder,
+ applyFunction,
+ implicitly[TypeInformation[ACC]],
+ implicitly[TypeInformation[R]]))
+ }
+
+ /**
+ * Applies the given fold function to each window. The window folded value is
+ * then passed as input of the process window function.
+ * The output of the process window function is interpreted as a regular non-windowed stream.
+ *
+ * @param initialValue The initial value of the fold
+ * @param foldFunction The fold function that is used for incremental aggregation
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def fold[R: TypeInformation, ACC: TypeInformation](
+ initialValue: ACC,
+ foldFunction: FoldFunction[T, ACC],
+ function: ProcessWindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+ val cleanedFunction = clean(function)
+ val cleanedFoldFunction = clean(foldFunction)
+
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[ACC, R, K, W](cleanedFunction)
+
+ asScalaStream(javaStream.fold(
+ initialValue,
+ cleanedFoldFunction,
+ applyFunction,
+ implicitly[TypeInformation[ACC]],
+ implicitly[TypeInformation[R]]))
+ }
+
// ---------------------------- apply() -------------------------------------
/**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def process[R: TypeInformation](
+ function: ProcessWindowFunction[T, R, K, W]): DataStream[R] = {
+
+ val cleanFunction = clean(function)
+ val applyFunction = new ScalaProcessWindowFunctionWrapper[T, R, K, W](cleanFunction)
+ asScalaStream(javaStream.process(applyFunction, implicitly[TypeInformation[R]]))
+ }
+
+ /**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
new file mode 100644
index 0000000..79f3918
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped)
+ * windows using a context for retrieving extra information.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam KEY The type of the key.
+ * @tparam W The type of the window.
+ */
+@PublicEvolving
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+ /**
+ * The context holding window metadata
+ */
+ abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ def window: W
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
new file mode 100644
index 0000000..320685a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+ * Base abstract class for functions that are evaluated over
+ * keyed (grouped) windows using a context for retrieving extra information.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam KEY The type of the key.
+ * @tparam W The type of the window.
+ */
+@Public
+abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
+ extends ProcessWindowFunction[IN, OUT, KEY, W]
+ with RichFunction {
+
+ @Transient
+ private var runtimeContext: RuntimeContext = null
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime context access
+ // --------------------------------------------------------------------------------------------
+
+ override def setRuntimeContext(t: RuntimeContext) {
+ this.runtimeContext = t
+ }
+
+ override def getRuntimeContext: RuntimeContext = {
+ if (this.runtimeContext != null) {
+ this.runtimeContext
+ }
+ else {
+ throw new IllegalStateException("The runtime context has not been initialized.")
+ }
+ }
+
+ override def getIterationRuntimeContext: IterationRuntimeContext = {
+ if (this.runtimeContext == null) {
+ throw new IllegalStateException("The runtime context has not been initialized.")
+ }
+ else {
+ this.runtimeContext match {
+ case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
+ case _ =>
+ throw new IllegalStateException("This stub is not part of an iteration step function.")
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ // --------------------------------------------------------------------------------------------
+
+ @throws[Exception]
+ override def open(parameters: Configuration) {
+ }
+
+ @throws[Exception]
+ override def close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
new file mode 100644
index 0000000..4a20371
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
+import org.apache.flink.api.java.operators.translation.WrappingFunction
+import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A wrapper function that exposes a Scala ProcessWindowFunction
+ * as a ProcessWindowFunction function.
+ *
+ * The Scala and Java Window functions differ in their type of "Iterable":
+ * - Scala WindowFunction: scala.Iterable
+ * - Java WindowFunction: java.lang.Iterable
+ */
+final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
+ private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
+ extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
+ with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+
+ override def process(
+ key: KEY,
+ context: JProcessWindowFunction[IN, OUT, KEY, W]#Context,
+ elements: java.lang.Iterable[IN],
+ out: Collector[OUT]): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+ }
+ func.process(key, ctx, elements.asScala, out)
+ }
+
+ override def getRuntimeContext: RuntimeContext = {
+ throw new RuntimeException("This should never be called")
+ }
+
+ override def getIterationRuntimeContext: IterationRuntimeContext = {
+ throw new RuntimeException("This should never be called")
+ }
+}
+
+private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
+ extends JProcessWindowFunction[IN, OUT, KEY, W]
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index 83697ce..a23145c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,13 +26,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
+import org.junit.{Ignore, Test}
import org.junit.Assert._
import scala.collection.mutable
@@ -150,6 +150,66 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}
@Test
+ @Ignore
+ def testFoldWithProcessWindowFunction(): Unit = {
+ WindowFoldITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessWindowFunction.reset()
+
+ val foldFunc = new FoldFunction[(String, Int), (Int, String)] {
+ override def fold(accumulator: (Int, String), value: (String, Int)): (Int, String) = {
+ (accumulator._1 + value._2, accumulator._2 + value._1)
+ }
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {
+ }
+ }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .fold(
+ (0, "R:"),
+ foldFunc,
+ new CheckingIdentityRichProcessWindowFunction[(Int, String), Tuple, TimeWindow]())
+ .addSink(new SinkFunction[(Int, String)]() {
+ def invoke(value: (Int, String)) {
+ WindowFoldITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("Fold Process Window Test")
+
+ val expectedResult = mutable.MutableList(
+ "(3,R:aaa)",
+ "(21,R:aaa)",
+ "(12,R:bbb)")
+
+ assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+ }
+
+ @Test
def testFoldAllWindow(): Unit = {
WindowFoldITCase.testResults = mutable.MutableList()
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index c38f422..bfbe6ee 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
import scala.collection.mutable
@@ -87,6 +87,56 @@ class WindowFunctionITCase {
}
@Test
+ @Ignore
+ def testRichProcessWindowFunction(): Unit = {
+ WindowFunctionITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessWindowFunction.reset()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {}
+
+ }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .process(new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowFunctionITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("RichProcessWindowFunction Test")
+
+ val expectedResult = mutable.MutableList(
+ "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+ "(b,3)", "(b,4)", "(b,5)")
+
+ assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+ }
+
+ @Test
def testRichAllWindowFunction(): Unit = {
WindowFunctionITCase.testResults = mutable.MutableList()
CheckingIdentityRichAllWindowFunction.reset()
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 9666266..5418108 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,15 +26,14 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
import scala.collection.mutable
@@ -150,6 +149,65 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
}
@Test
+ @Ignore
+ def testReduceWithProcessWindowFunction(): Unit = {
+ WindowReduceITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessWindowFunction.reset()
+
+ val reduceFunc = new ReduceFunction[(String, Int)] {
+ override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+ (a._1 + b._1, a._2 + b._2)
+ }
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {
+ }
+ }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+ source1
+ .keyBy(0)
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .reduce(
+ reduceFunc,
+ new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowReduceITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("Reduce Process Window Test")
+
+ val expectedResult = mutable.MutableList(
+ "(aaa,3)",
+ "(aaa,21)",
+ "(bbb,12)")
+
+ assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessWindowFunction.checkRichMethodCalls()
+ }
+
+ @Test
def testReduceAllWindow(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
http://git-wip-us.apache.org/repos/asf/flink/blob/86dff0e6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
new file mode 100644
index 0000000..d62f2d3
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
+ extends RichProcessWindowFunction[T, T, K, W] {
+
+ override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
+ for (value <- input) {
+ out.collect(value)
+ }
+ }
+
+ override def open(conf: Configuration): Unit = {
+ super.open(conf)
+ CheckingIdentityRichProcessWindowFunction.openCalled = true
+ }
+
+ override def close(): Unit = {
+ super.close()
+ CheckingIdentityRichProcessWindowFunction.closeCalled = true
+ }
+
+ override def setRuntimeContext(context: RuntimeContext): Unit = {
+ super.setRuntimeContext(context)
+ CheckingIdentityRichProcessWindowFunction.contextSet = true
+ }
+}
+
+object CheckingIdentityRichProcessWindowFunction {
+
+ @volatile
+ private[CheckingIdentityRichProcessWindowFunction] var closeCalled = false
+
+ @volatile
+ private[CheckingIdentityRichProcessWindowFunction] var openCalled = false
+
+ @volatile
+ private[CheckingIdentityRichProcessWindowFunction] var contextSet = false
+
+ def reset(): Unit = {
+ closeCalled = false
+ openCalled = false
+ contextSet = false
+ }
+
+ def checkRichMethodCalls(): Unit = {
+ if (!contextSet) {
+ throw new AssertionError("context not set")
+ }
+ if (!openCalled) {
+ throw new AssertionError("open() not called")
+ }
+ if (!closeCalled) {
+ throw new AssertionError("close() not called")
+ }
+ }
+}