You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/21 07:35:14 UTC

[1/2] flink git commit: [FLINK-5968] Add documentation for WindowedStream.aggregate()

Repository: flink
Updated Branches:
  refs/heads/master ebc3bc1f9 -> 558c71d2e


[FLINK-5968] Add documentation for WindowedStream.aggregate()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/558c71d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/558c71d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/558c71d2

Branch: refs/heads/master
Commit: 558c71d2e24515515f9c71d98e8176205b5dd854
Parents: f176c91
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Oct 14 10:45:33 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Oct 21 09:33:38 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 214 ++++++++++++++++++++++++++++--
 1 file changed, 203 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/558c71d2/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 52c4b47..f31d2e5 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -41,7 +41,7 @@ for the rest of the page.
           [.evictor(...)]            <-  optional: "evictor" (else no evictor)
           [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
           [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
-           .reduce/fold/apply()      <-  required: "function"
+           .reduce/aggregate/fold/apply()      <-  required: "function"
 
 **Non-Keyed Windows**
 
@@ -51,7 +51,7 @@ for the rest of the page.
           [.evictor(...)]            <-  optional: "evictor" (else no evictor)
           [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
           [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
-           .reduce/fold/apply()      <-  required: "function"
+           .reduce/aggregate/fold/apply()      <-  required: "function"
 
 In the above, the commands in square brackets ([...]) are optional. This reveals that Flink allows you to customize your
 windowing logic in many different ways so that it best fits your needs.
@@ -70,8 +70,8 @@ lateness of 1 min, Flink will create a new window for the interval between `12:0
 a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the `12:06`
 timestamp.
 
-In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function (`ProcessWindowFunction`, `ReduceFunction` or
-`FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function will contain the computation to
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function (`ProcessWindowFunction`, `ReduceFunction`,
+`AggregateFunction` or `FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function will contain the computation to
 be applied to the contents of the window, while the `Trigger` specifies the conditions under which the window is
 considered ready for the function to be applied. A triggering policy might be something like "when the number of elements
 in the window is more than 4", or "when the watermark passes the end of the window". A trigger can also decide to
@@ -333,7 +333,7 @@ they are  evaluated differently than tumbling and sliding windows. Internally, a
 creates a new window for each arriving record and merges windows together if their are closer to each other
 than the defined gap.
 In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
-[Window Function](#window-functions), such as `ReduceFunction` or `ProcessWindowFunction`
+[Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction`
 (`FoldFunction` cannot merge.)
 
 ### Global Windows
@@ -378,14 +378,14 @@ to perform on each of these windows. This is the responsibility of the *window f
 elements of each (possibly keyed) window once the system determines that a window is ready for processing
 (see [triggers](#triggers) for how Flink determines when a window is ready).
 
-The window function can be one of `ReduceFunction`, `FoldFunction` or `ProcessWindowFunction`. The first
+The window function can be one of `ReduceFunction`, `AggregateFunction`, `FoldFunction` or `ProcessWindowFunction`. The first
 two can be executed more efficiently (see [State Size](#state size) section) because Flink can incrementally aggregate
 the elements for each window as they arrive. A `ProcessWindowFunction` gets an `Iterable` for all the elements contained in a
 window and additional meta information about the window to which the elements belong.
 
 A windowed transformation with a `ProcessWindowFunction` cannot be executed as efficiently as the other
 cases because Flink has to buffer *all* elements for a window internally before invoking the function.
-This can be mitigated by combining a `ProcessWindowFunction` with a `ReduceFunction` or `FoldFunction` to
+This can be mitigated by combining a `ProcessWindowFunction` with a `ReduceFunction`, `AggregateFunction`, or `FoldFunction` to
 get both incremental aggregation of window elements and the additional window metadata that the
 `ProcessWindowFunction` receives. We will look at examples for each of these variants.
 
@@ -427,6 +427,93 @@ input
 
 The above example sums up the second fields of the tuples for all elements in a window.
 
+### AggregateFunction
+
+An `AggregateFunction` is a generalized version of a `ReduceFunction` that has three types: an
+input type (`IN`), accumulator type (`ACC`), and an output type (`OUT`). The input type is the type
+of elements in the input stream and the `AggregateFunction` has a method for adding one input
+element to an accumulator. The interface also has methods for creating an initial accumulator,
+for merging two accumulators into one accumulator and for extracting an output (of type `OUT`) from
+an accumulator. We will see how this works in the example below.
+
+Same as with `ReduceFunction`, Flink will incrementally aggregate input elements of a window as they
+arrive.
+
+A `AggregateFunction` can be defined and used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
+  @Override
+  public Tuple2<Long, Long> createAccumulator() {
+    return new Tuple2<>(0L, 0L);
+  }
+
+  @Override
+  public Tuple2<Long, Long> add(
+    Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+  }
+
+  @Override
+  public Double getResult(Tuple2<Long, Long> accumulator) {
+    return accumulator.f0 / accumulator.f1;
+  }
+
+  @Override
+  public Tuple2<Long, Long> merge(
+    Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
+    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
+  }
+}
+
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .aggregate(new AverageAggregate());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+/**
+ * The accumulator is used to keep a running sum and a count. The [getResult] method
+ * computes the average.
+ */
+class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
+  override def createAccumulator() = (0L, 0L)
+
+  override def add(value: (String, Long), accumulator: (Long, Long)) =
+    (accumulator._1 + value._2, accumulator._2 + 1L)
+
+  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
+
+  override def merge(a: (Long, Long), b: (Long, Long)) =
+    (a._1 + b._1, a._2 + b._2)
+}
+
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .aggregate(new AverageAggregate)
+{% endhighlight %}
+</div>
+</div>
+
+The above example computes the average of the second field of the elements in the window.
+
 ### FoldFunction
 
 A `FoldFunction` specifies how an input element of the window is combined with an element of
@@ -645,11 +732,11 @@ class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), Stri
 
 The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output.
 
-<span class="label label-danger">Attention</span> Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`.
+<span class="label label-danger">Attention</span> Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` or `AggregateFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`.
 
 ### ProcessWindowFunction with Incremental Aggregation
 
-A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to
+A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an `AggregateFunction`, or a `FoldFunction` to
 incrementally aggregate elements as they arrive in the window.
 When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result.
 This allows to incrementally compute windows while having access to the
@@ -721,6 +808,111 @@ input
 </div>
 </div>
 
+#### Incremental Window Aggregation with AggregateFunction
+
+The following example shows how an incremental `AggregateFunction` can be combined with
+a `ProcesWindowFunction` to compute the average and also emit the key and window along with
+the average.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long> input = ...;
+
+input
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
+  @Override
+  public Tuple2<Long, Long> createAccumulator() {
+    return new Tuple2<>(0L, 0L);
+  }
+
+  @Override
+  public Tuple2<Long, Long> add(
+    Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+  }
+
+  @Override
+  public Double getResult(Tuple2<Long, Long> accumulator) {
+    return accumulator.f0 / accumulator.f1;
+  }
+
+  @Override
+  public Tuple2<Long, Long> merge(
+    Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
+    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
+  }
+}
+
+private static class MyProcessWindowFunction
+    implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
+
+  public void apply(String key,
+                    Context context,
+                    Iterable<Double> averages,
+                    Collector<Tuple2<String, Double>> out) {
+      Double average = averages.iterator().next();
+      out.collect(new Tuple2<>(key, average));
+  }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val input: DataStream[(String, Long)] = ...
+
+input
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The [getResult] method
+ * computes the average.
+ */
+class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
+  override def createAccumulator() = (0L, 0L)
+
+  override def add(value: (String, Long), accumulator: (Long, Long)) =
+    (accumulator._1 + value._2, accumulator._2 + 1L)
+
+  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
+
+  override def merge(a: (Long, Long), b: (Long, Long)) =
+    (a._1 + b._1, a._2 + b._2)
+}
+
+class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
+
+  def apply(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = {
+    var count = 0L
+    for (in <- input) {
+      count = count + 1
+    }
+    val average = averages.iterator.next()
+    out.collect((key, average))
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
 #### Incremental Window Aggregation with FoldFunction
 
 The following example shows how an incremental `FoldFunction` can be combined with
@@ -892,7 +1084,7 @@ Two things to notice about the above methods are:
 Once a trigger determines that a window is ready for processing, it fires, *i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the window operator
 to emit the result of the current window. Given a window with a `ProcessWindowFunction`
 all elements are passed to the `ProcessWindowFunction` (possibly after passing them to an evictor).
-Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.
+Windows with `ReduceFunction`, `AggregateFunction`, or `FoldFunction` simply emit their eagerly aggregated result.
 
 When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
 By default, the pre-implemented triggers simply `FIRE` without purging the window state.
@@ -1162,7 +1354,7 @@ Windows can be defined over long periods of time (such as days, weeks, or months
 
 1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the [Window Assigners](#window-assigners) section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
 
-2. `FoldFunction` and `ReduceFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a `ProcessWindowFunction` requires accumulating all elements.
+2. `ReduceFunction`, `AggregateFunction`, and `FoldFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast, just using a `ProcessWindowFunction` requires accumulating all elements.
 
 3. Using an `Evictor` prevents any pre-aggregation, as all the elements of a window have to be passed through the evictor before applying the computation (see [Evictors](#evictors)).
 


[2/2] flink git commit: [FLINK-7837] Extend AggregateFunction.add() to work with immutable types

Posted by al...@apache.org.
[FLINK-7837] Extend AggregateFunction.add() to work with immutable types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f176c917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f176c917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f176c917

Branch: refs/heads/master
Commit: f176c917d56fa682c047930be9e579b635d7268b
Parents: ebc3bc1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 13 19:17:01 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Oct 21 09:33:38 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBAggregatingState.java          |   4 +-
 .../api/common/functions/AggregateFunction.java |   7 +-
 .../common/functions/RichAggregateFunction.java |   2 +-
 .../aggregate/AggregateAggFunction.scala        |   3 +-
 .../itcases/AbstractQueryableStateTestBase.java |  34 ++-
 .../state/ImmutableAggregatingStateTest.java    |  32 ++-
 .../state/heap/HeapAggregatingState.java        |   5 +-
 .../runtime/state/StateBackendTestBase.java     | 210 ++++++++++++++++++-
 .../AggregateApplyAllWindowFunction.java        |   4 +-
 .../windowing/AggregateApplyWindowFunction.java |   4 +-
 ...ternalAggregateProcessAllWindowFunction.java |   4 +-
 .../InternalAggregateProcessWindowFunction.java |   4 +-
 .../functions/InternalWindowFunctionTest.java   |   6 +-
 .../windowing/AllWindowTranslationTest.java     |   7 +-
 .../windowing/WindowTranslationTest.java        |   7 +-
 .../api/scala/WindowTranslationTest.scala       |   4 +-
 16 files changed, 267 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 8fce21c..2c07814 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -116,12 +116,12 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 			final byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			// deserialize the current accumulator, or create a blank one
-			final ACC accumulator = valueBytes == null ?
+			ACC accumulator = valueBytes == null ?
 					aggFunction.createAccumulator() :
 					valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
 
 			// aggregate the value into the accumulator
-			aggFunction.add(value, accumulator);
+			accumulator = aggFunction.add(value, accumulator);
 
 			// serialize the new accumulator
 			final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
index 3c79396..78b8d94 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -127,12 +127,15 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
 	ACC createAccumulator();
 
 	/**
-	 * Adds the given value to the given accumulator.
+	 * Adds the given input value to the given accumulator, returning the
+	 * new accumulator value.
+	 *
+	 * <p>For efficiency, the input accumulator may be modified and returned.
 	 * 
 	 * @param value The value to add
 	 * @param accumulator The accumulator to add the value to
 	 */
-	void add(IN value, ACC accumulator);
+	ACC add(IN value, ACC accumulator);
 
 	/**
 	 * Gets the result of the aggregation from the accumulator.

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
index caf2557..dbaf639 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
@@ -43,7 +43,7 @@ public abstract class RichAggregateFunction<IN, ACC, OUT>
 	public abstract ACC createAccumulator();
 
 	@Override
-	public abstract void add(IN value, ACC accumulator);
+	public abstract ACC add(IN value, ACC accumulator);
 
 	@Override
 	public abstract OUT getResult(ACC accumulator);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index d3bffda..330386b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -42,11 +42,12 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
     function.createAccumulators()
   }
 
-  override def add(value: CRow, accumulatorRow: Row): Unit = {
+  override def add(value: CRow, accumulatorRow: Row): Row = {
     if (function == null) {
       initFunction()
     }
     function.accumulate(accumulatorRow, value.row)
+    accumulatorRow
   }
 
   override def getResult(accumulatorRow: Row): Row = {

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 6df77c0..4d27da2 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -1099,11 +1099,11 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			DataStream<Tuple2<Integer, Long>> source = env
 					.addSource(new TestAscendingValueSource(numElements));
 
-			final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor =
+			final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
 					new AggregatingStateDescriptor<>(
 							"aggregates",
 							new SumAggr(),
-							MutableString.class);
+							String.class);
 			aggrStateDescriptor.setQueryable("aggr-queryable");
 
 			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
@@ -1291,10 +1291,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 		private static final long serialVersionUID = 1L;
 
-		private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor;
+		private final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDescriptor;
 		private transient AggregatingState<Tuple2<Integer, Long>, String> state;
 
-		AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) {
+		AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDesc) {
 			this.stateDescriptor = stateDesc;
 		}
 
@@ -1316,39 +1316,33 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	/**
 	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
 	 */
-	private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+	private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, String, String> {
 
 		private static final long serialVersionUID = -6249227626701264599L;
 
 		@Override
-		public MutableString createAccumulator() {
-			return new MutableString();
+		public String createAccumulator() {
+			return "0";
 		}
 
 		@Override
-		public void add(Tuple2<Integer, Long> value, MutableString accumulator) {
-			long acc = Long.valueOf(accumulator.value);
+		public String add(Tuple2<Integer, Long> value, String accumulator) {
+			long acc = Long.valueOf(accumulator);
 			acc += value.f1;
-			accumulator.value = Long.toString(acc);
+			return Long.toString(acc);
 		}
 
 		@Override
-		public String getResult(MutableString accumulator) {
-			return accumulator.value;
+		public String getResult(String accumulator) {
+			return accumulator;
 		}
 
 		@Override
-		public MutableString merge(MutableString a, MutableString b) {
-			MutableString nValue = new MutableString();
-			nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value));
-			return nValue;
+		public String merge(String a, String b) {
+			return Long.toString(Long.valueOf(a) + Long.valueOf(b));
 		}
 	}
 
-	private static final class MutableString {
-		String value = "0";
-	}
-
 	/**
 	 * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
index 69b2f61..2e05f61 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
@@ -36,11 +36,11 @@ import static org.junit.Assert.assertEquals;
  */
 public class ImmutableAggregatingStateTest {
 
-	private final AggregatingStateDescriptor<Long, MutableString, String> aggrStateDesc =
+	private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc =
 			new AggregatingStateDescriptor<>(
 					"test",
 					new SumAggr(),
-					MutableString.class);
+					String.class);
 
 	private ImmutableAggregatingState<Long, String> aggrState;
 
@@ -50,8 +50,7 @@ public class ImmutableAggregatingStateTest {
 			aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
 		}
 
-		final MutableString initValue = new MutableString();
-		initValue.value = "42";
+		final String initValue = "42";
 
 		ByteArrayOutputStream out = new ByteArrayOutputStream();
 		aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out));
@@ -81,34 +80,29 @@ public class ImmutableAggregatingStateTest {
 	/**
 	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
 	 */
-	private static class SumAggr implements AggregateFunction<Long, MutableString, String> {
+	private static class SumAggr implements AggregateFunction<Long, String, String> {
 
 		private static final long serialVersionUID = -6249227626701264599L;
 
 		@Override
-		public MutableString createAccumulator() {
-			return new MutableString();
+		public String createAccumulator() {
+			return "";
 		}
 
 		@Override
-		public void add(Long value, MutableString accumulator) {
-			accumulator.value += ", " + value;
+		public String add(Long value, String accumulator) {
+			accumulator += ", " + value;
+			return accumulator;
 		}
 
 		@Override
-		public String getResult(MutableString accumulator) {
-			return accumulator.value;
+		public String getResult(String accumulator) {
+			return accumulator;
 		}
 
 		@Override
-		public MutableString merge(MutableString a, MutableString b) {
-			MutableString nValue = new MutableString();
-			nValue.value = a.value + ", " + b.value;
-			return nValue;
+		public String merge(String a, String b) {
+			return a + ", " + b;
 		}
 	}
-
-	private static final class MutableString {
-		String value;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 64fc1db..3fa8cd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -114,8 +114,7 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 			if (accumulator == null) {
 				accumulator = aggFunction.createAccumulator();
 			}
-			aggFunction.add(value, accumulator);
-			return accumulator;
+			return aggFunction.add(value, accumulator);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 50b5e26..8f803ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1713,10 +1713,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testAggregatingStateAddAndGet() throws Exception {
+	public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
 
 		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-			new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+			new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
 		
 		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
 
@@ -1768,10 +1768,183 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testAggregatingStateMerging() throws Exception {
+	public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
 
 		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-			new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+			new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			InternalAggregatingState<Integer, Long, Long> state =
+				(InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
+
+		final AggregatingStateDescriptor<Long, Long, Long> stateDescr =
+			new AggregatingStateDescriptor<>("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			AggregatingState<Long, Long> state =
+				keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
+
+		final AggregatingStateDescriptor<Long, Long, Long> stateDescr =
+			new AggregatingStateDescriptor<>("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
 
 		final Integer namespace1 = 1;
 		final Integer namespace2 = 2;
@@ -3450,7 +3623,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@SuppressWarnings("serial")
-	private static class AggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+	private static class MutableAggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
 
 		@Override
 		public MutableLong createAccumulator() {
@@ -3458,8 +3631,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		@Override
-		public void add(Long value, MutableLong accumulator) {
+		public MutableLong add(Long value, MutableLong accumulator) {
 			accumulator.value += value;
+			return accumulator;
 		}
 
 		@Override
@@ -3474,6 +3648,30 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
+	@SuppressWarnings("serial")
+	private static class ImmutableAggregatingAddingFunction implements AggregateFunction<Long, Long, Long> {
+
+		@Override
+		public Long createAccumulator() {
+			return 0L;
+		}
+
+		@Override
+		public Long add(Long value, Long accumulator) {
+			return accumulator += value;
+		}
+
+		@Override
+		public Long getResult(Long accumulator) {
+			return accumulator;
+		}
+
+		@Override
+		public Long merge(Long a, Long b) {
+			return a + b;
+		}
+	}
+
 	private static final class MutableLong {
 		long value;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
index e20b878..94f752a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
@@ -56,10 +56,10 @@ public class AggregateApplyAllWindowFunction<W extends Window, T, ACC, V, R>
 
 	@Override
 	public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T value : values) {
-			aggFunction.add(value, acc);
+			acc = aggFunction.add(value, acc);
 		}
 
 		wrappedFunction.apply(window, Collections.singletonList(aggFunction.getResult(acc)), out);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
index 6d2d7f4..cdaa2b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
@@ -54,10 +54,10 @@ public class AggregateApplyWindowFunction<K, W extends Window, T, ACC, V, R>
 
 	@Override
 	public void apply(K key, W window, Iterable<T> values, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T val : values) {
-			aggFunction.add(val, acc);
+			acc = aggFunction.add(val, acc);
 		}
 
 		wrappedFunction.apply(key, window, Collections.singletonList(aggFunction.getResult(acc)), out);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 3d58156..e17412a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -65,10 +65,10 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 
 	@Override
 	public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
-			aggFunction.add(val, acc);
+			acc = aggFunction.add(val, acc);
 		}
 
 		this.ctx.window = window;

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index e2dfe3f..c46fa55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -59,10 +59,10 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
 
 	@Override
 	public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
-			aggFunction.add(val, acc);
+			acc = aggFunction.add(val, acc);
 		}
 
 		this.ctx.window = window;

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 7657ce7..dc5b24c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -457,8 +457,9 @@ public class InternalWindowFunctionTest {
 					}
 
 					@Override
-					public void add(Long value, Set<Long> accumulator) {
+					public Set<Long> add(Long value, Set<Long> accumulator) {
 						accumulator.add(value);
+						return accumulator;
 					}
 
 					@Override
@@ -552,8 +553,9 @@ public class InternalWindowFunctionTest {
 					}
 
 					@Override
-					public void add(Long value, Set<Long> accumulator) {
+					public Set<Long> add(Long value, Set<Long> accumulator) {
 						accumulator.add(value);
+						return accumulator;
 					}
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index f967a5b..27963d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -1462,9 +1462,10 @@ public class AllWindowTranslationTest {
 		}
 
 		@Override
-		public void add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
+		public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
 			accumulator.f0 = value.f0;
 			accumulator.f1 = value.f1;
+			return accumulator;
 		}
 
 		@Override
@@ -1486,7 +1487,9 @@ public class AllWindowTranslationTest {
 		}
 
 		@Override
-		public void add(T value, T accumulator) {}
+		public T add(T value, T accumulator) {
+			return accumulator;
+		}
 
 		@Override
 		public T getResult(T accumulator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 821438e..a1276e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -1672,9 +1672,10 @@ public class WindowTranslationTest {
 		}
 
 		@Override
-		public void add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) {
+		public Tuple2<String, Integer> add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) {
 			accumulator.f0 = value.f0;
 			accumulator.f1 = value.f2;
+			return accumulator;
 		}
 
 		@Override
@@ -1696,7 +1697,9 @@ public class WindowTranslationTest {
 		}
 
 		@Override
-		public void add(T value, T accumulator) {}
+		public T add(T value, T accumulator) {
+			return accumulator;
+		}
 
 		@Override
 		public T getResult(T accumulator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index cc55f0d..916884f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -1988,7 +1988,7 @@ class DummyAggregator extends AggregateFunction[(String, Int), (String, Int), (S
 
   override def getResult(accumulator: (String, Int)): (String, Int) = accumulator
 
-  override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
+  override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = accumulator
 }
 
 class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String, Int), (String, Int)]
@@ -2000,7 +2000,7 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String,
 
   override def getResult(accumulator: (String, Int)): (String, Int) = accumulator
 
-  override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
+  override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = accumulator
 }
 
 class TestWindowFunction