You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:55 UTC

[6/8] flink git commit: [FLINK-2550] Rename reduceWindow to reduce on *WindowedStream, add Lambda Reduce

[FLINK-2550] Rename reduceWindow to reduce on *WindowedStream, add Lambda Reduce

Lambda Reduce is the reduce method that takes a Scala Lambda function.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bac272c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bac272c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bac272c

Branch: refs/heads/master
Commit: 0bac272c8309f2e7567ba762076bd75eeb8ea83a
Parents: 8634dbb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 17:36:33 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  2 +-
 .../api/datastream/WindowedStream.java          |  2 +-
 .../windowing/AllWindowTranslationTest.java     |  6 ++--
 .../windowing/TimeWindowTranslationTest.java    |  4 +--
 .../windowing/WindowTranslationTest.java        |  6 ++--
 .../GroupedProcessingTimeWindowExample.java     |  2 +-
 .../streaming/api/scala/AllWindowedStream.scala | 31 ++++++++++++++++++--
 .../streaming/api/scala/WindowedStream.scala    | 31 ++++++++++++++++++--
 .../api/scala/AllWindowTranslationTest.scala    |  6 ++--
 .../api/scala/WindowTranslationTest.scala       |  6 ++--
 10 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 78ba8ad..0cc1854 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -120,7 +120,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The reduce function.
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
-	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+	public DataStream<T> reduce(ReduceFunction<T> function) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 349651e..0ea9cad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -131,7 +131,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The reduce function.
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
-	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+	public DataStream<T> reduce(ReduceFunction<T> function) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 767b40c..09a7149 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -66,7 +66,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -103,7 +103,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -149,7 +149,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 76d7bfe..76c6f20 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -59,7 +59,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
 				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -103,7 +103,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS),
 						Time.of(100, TimeUnit.MILLISECONDS))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 9dc6687..5124add 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -66,7 +66,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.keyBy(0)
 				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
 						Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -105,7 +105,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.keyBy(0)
 				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -153,7 +153,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.keyBy(0)
 				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
-				.reduceWindow(reducer);
+				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 5d32b8e..982b73d 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -79,7 +79,7 @@ public class GroupedProcessingTimeWindowExample {
 		stream
 			.keyBy(0)
 			.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
-			.reduceWindow(new SummingReducer())
+			.reduce(new SummingReducer())
 
 			// alternative: use a apply function which does not pre-aggregate
 //			.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 4f36722..9054d95 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -73,6 +73,26 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
   // ------------------------------------------------------------------------
   //  Operations on the keyed windows
   // ------------------------------------------------------------------------
+
+  /**
+   * Applies a reduce function to the window. The window function is called for each evaluation
+   * of the window for each key individually. The output of the reduce function is interpreted
+   * as a regular non-windowed stream.
+   *
+   * This window will try and pre-aggregate data as much as the window policies permit. For example,
+   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
+   * interval, so a few elements are stored per key (one per slide interval).
+   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+   * aggregation tree.
+   *
+   * @param function The reduce function.
+   * @return The data stream that is the result of applying the reduce function to the window.
+   */
+  def reduce(function: ReduceFunction[T]): DataStream[T] = {
+    javaStream.reduce(clean(function))
+  }
+
   /**
    * Applies a reduce function to the window. The window function is called for each evaluation
    * of the window for each key individually. The output of the reduce function is interpreted
@@ -88,8 +108,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * @param function The reduce function.
    * @return The data stream that is the result of applying the reduce function to the window.
    */
-  def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduceWindow(clean(function))
+  def reduce(function: (T, T) => T): DataStream[T] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val cleanFun = clean(function)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index a688846..2d6806d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -76,6 +76,26 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   // ------------------------------------------------------------------------
   //  Operations on the keyed windows
   // ------------------------------------------------------------------------
+
+  /**
+   * Applies a reduce function to the window. The window function is called for each evaluation
+   * of the window for each key individually. The output of the reduce function is interpreted
+   * as a regular non-windowed stream.
+   *
+   * This window will try and pre-aggregate data as much as the window policies permit. For example,
+   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
+   * interval, so a few elements are stored per key (one per slide interval).
+   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+   * aggregation tree.
+   *
+   * @param function The reduce function.
+   * @return The data stream that is the result of applying the reduce function to the window.
+   */
+  def reduce(function: ReduceFunction[T]): DataStream[T] = {
+    javaStream.reduce(clean(function))
+  }
+
   /**
    * Applies a reduce function to the window. The window function is called for each evaluation
    * of the window for each key individually. The output of the reduce function is interpreted
@@ -91,8 +111,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @param function The reduce function.
    * @return The data stream that is the result of applying the reduce function to the window.
    */
-  def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduceWindow(clean(function))
+  def reduce(function: (T, T) => T): DataStream[T] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val cleanFun = clean(function)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 247256f..dece9f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -58,7 +58,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
       .windowAll(SlidingProcessingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -150,7 +150,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index f1b05c6..fa9c0a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -55,7 +55,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
       .window(SlidingProcessingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
         .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -99,7 +99,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -152,7 +152,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]