You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:55 UTC
[6/8] flink git commit: [FLINK-2550] Rename reduceWindow to reduce on
*WindowedStream, add Lambda Reduce
[FLINK-2550] Rename reduceWindow to reduce on *WindowedStream, add Lambda Reduce
Lambda Reduce is the reduce method that takes a Scala Lambda function.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bac272c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bac272c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bac272c
Branch: refs/heads/master
Commit: 0bac272c8309f2e7567ba762076bd75eeb8ea83a
Parents: 8634dbb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 17:36:33 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 2 +-
.../api/datastream/WindowedStream.java | 2 +-
.../windowing/AllWindowTranslationTest.java | 6 ++--
.../windowing/TimeWindowTranslationTest.java | 4 +--
.../windowing/WindowTranslationTest.java | 6 ++--
.../GroupedProcessingTimeWindowExample.java | 2 +-
.../streaming/api/scala/AllWindowedStream.scala | 31 ++++++++++++++++++--
.../streaming/api/scala/WindowedStream.scala | 31 ++++++++++++++++++--
.../api/scala/AllWindowTranslationTest.scala | 6 ++--
.../api/scala/WindowTranslationTest.scala | 6 ++--
10 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 78ba8ad..0cc1854 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -120,7 +120,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
- public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ public DataStream<T> reduce(ReduceFunction<T> function) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 349651e..0ea9cad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -131,7 +131,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
- public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ public DataStream<T> reduce(ReduceFunction<T> function) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 767b40c..09a7149 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -66,7 +66,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -103,7 +103,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -149,7 +149,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 76d7bfe..76c6f20 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -59,7 +59,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -103,7 +103,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window1 = source
.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS),
Time.of(100, TimeUnit.MILLISECONDS))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 9dc6687..5124add 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -66,7 +66,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -105,7 +105,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -153,7 +153,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
- .reduceWindow(reducer);
+ .reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 5d32b8e..982b73d 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -79,7 +79,7 @@ public class GroupedProcessingTimeWindowExample {
stream
.keyBy(0)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
- .reduceWindow(new SummingReducer())
+ .reduce(new SummingReducer())
// alternative: use a apply function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 4f36722..9054d95 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -73,6 +73,26 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
// ------------------------------------------------------------------------
// Operations on the keyed windows
// ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ *
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
+ * interval, so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ def reduce(function: ReduceFunction[T]): DataStream[T] = {
+ javaStream.reduce(clean(function))
+ }
+
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
@@ -88,8 +108,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
- def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
- javaStream.reduceWindow(clean(function))
+ def reduce(function: (T, T) => T): DataStream[T] = {
+ if (function == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ val cleanFun = clean(function)
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+ }
+ reduce(reducer)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index a688846..2d6806d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -76,6 +76,26 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
// ------------------------------------------------------------------------
// Operations on the keyed windows
// ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ *
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
+ * interval, so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ def reduce(function: ReduceFunction[T]): DataStream[T] = {
+ javaStream.reduce(clean(function))
+ }
+
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
@@ -91,8 +111,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
- def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
- javaStream.reduceWindow(clean(function))
+ def reduce(function: (T, T) => T): DataStream[T] = {
+ if (function == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ val cleanFun = clean(function)
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+ }
+ reduce(reducer)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 247256f..dece9f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -58,7 +58,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
.windowAll(SlidingProcessingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
- .reduceWindow(reducer)
+ .reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
- .reduceWindow(reducer)
+ .reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -150,7 +150,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
- .reduceWindow(reducer)
+ .reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index f1b05c6..fa9c0a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -55,7 +55,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.window(SlidingProcessingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
- .reduceWindow(reducer)
+ .reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -99,7 +99,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
- .reduceWindow(reducer)
+ .reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -152,7 +152,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
- .reduceWindow(reducer)
+ .reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]