You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/21 07:35:15 UTC

[2/2] flink git commit: [FLINK-7837] Extend AggregateFunction.add() to work with immutable types

[FLINK-7837] Extend AggregateFunction.add() to work with immutable types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f176c917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f176c917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f176c917

Branch: refs/heads/master
Commit: f176c917d56fa682c047930be9e579b635d7268b
Parents: ebc3bc1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 13 19:17:01 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Oct 21 09:33:38 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBAggregatingState.java          |   4 +-
 .../api/common/functions/AggregateFunction.java |   7 +-
 .../common/functions/RichAggregateFunction.java |   2 +-
 .../aggregate/AggregateAggFunction.scala        |   3 +-
 .../itcases/AbstractQueryableStateTestBase.java |  34 ++-
 .../state/ImmutableAggregatingStateTest.java    |  32 ++-
 .../state/heap/HeapAggregatingState.java        |   5 +-
 .../runtime/state/StateBackendTestBase.java     | 210 ++++++++++++++++++-
 .../AggregateApplyAllWindowFunction.java        |   4 +-
 .../windowing/AggregateApplyWindowFunction.java |   4 +-
 ...ternalAggregateProcessAllWindowFunction.java |   4 +-
 .../InternalAggregateProcessWindowFunction.java |   4 +-
 .../functions/InternalWindowFunctionTest.java   |   6 +-
 .../windowing/AllWindowTranslationTest.java     |   7 +-
 .../windowing/WindowTranslationTest.java        |   7 +-
 .../api/scala/WindowTranslationTest.scala       |   4 +-
 16 files changed, 267 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 8fce21c..2c07814 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -116,12 +116,12 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 			final byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			// deserialize the current accumulator, or create a blank one
-			final ACC accumulator = valueBytes == null ?
+			ACC accumulator = valueBytes == null ?
 					aggFunction.createAccumulator() :
 					valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
 
 			// aggregate the value into the accumulator
-			aggFunction.add(value, accumulator);
+			accumulator = aggFunction.add(value, accumulator);
 
 			// serialize the new accumulator
 			final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
index 3c79396..78b8d94 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -127,12 +127,15 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
 	ACC createAccumulator();
 
 	/**
-	 * Adds the given value to the given accumulator.
+	 * Adds the given input value to the given accumulator, returning the
+	 * new accumulator value.
+	 *
+	 * <p>For efficiency, the input accumulator may be modified and returned.
 	 * 
 	 * @param value The value to add
 	 * @param accumulator The accumulator to add the value to
 	 */
-	void add(IN value, ACC accumulator);
+	ACC add(IN value, ACC accumulator);
 
 	/**
 	 * Gets the result of the aggregation from the accumulator.

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
index caf2557..dbaf639 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
@@ -43,7 +43,7 @@ public abstract class RichAggregateFunction<IN, ACC, OUT>
 	public abstract ACC createAccumulator();
 
 	@Override
-	public abstract void add(IN value, ACC accumulator);
+	public abstract ACC add(IN value, ACC accumulator);
 
 	@Override
 	public abstract OUT getResult(ACC accumulator);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index d3bffda..330386b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -42,11 +42,12 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
     function.createAccumulators()
   }
 
-  override def add(value: CRow, accumulatorRow: Row): Unit = {
+  override def add(value: CRow, accumulatorRow: Row): Row = {
     if (function == null) {
       initFunction()
     }
     function.accumulate(accumulatorRow, value.row)
+    accumulatorRow
   }
 
   override def getResult(accumulatorRow: Row): Row = {

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 6df77c0..4d27da2 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -1099,11 +1099,11 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			DataStream<Tuple2<Integer, Long>> source = env
 					.addSource(new TestAscendingValueSource(numElements));
 
-			final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor =
+			final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
 					new AggregatingStateDescriptor<>(
 							"aggregates",
 							new SumAggr(),
-							MutableString.class);
+							String.class);
 			aggrStateDescriptor.setQueryable("aggr-queryable");
 
 			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
@@ -1291,10 +1291,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 		private static final long serialVersionUID = 1L;
 
-		private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor;
+		private final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDescriptor;
 		private transient AggregatingState<Tuple2<Integer, Long>, String> state;
 
-		AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) {
+		AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDesc) {
 			this.stateDescriptor = stateDesc;
 		}
 
@@ -1316,39 +1316,33 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	/**
 	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
 	 */
-	private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+	private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, String, String> {
 
 		private static final long serialVersionUID = -6249227626701264599L;
 
 		@Override
-		public MutableString createAccumulator() {
-			return new MutableString();
+		public String createAccumulator() {
+			return "0";
 		}
 
 		@Override
-		public void add(Tuple2<Integer, Long> value, MutableString accumulator) {
-			long acc = Long.valueOf(accumulator.value);
+		public String add(Tuple2<Integer, Long> value, String accumulator) {
+			long acc = Long.valueOf(accumulator);
 			acc += value.f1;
-			accumulator.value = Long.toString(acc);
+			return Long.toString(acc);
 		}
 
 		@Override
-		public String getResult(MutableString accumulator) {
-			return accumulator.value;
+		public String getResult(String accumulator) {
+			return accumulator;
 		}
 
 		@Override
-		public MutableString merge(MutableString a, MutableString b) {
-			MutableString nValue = new MutableString();
-			nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value));
-			return nValue;
+		public String merge(String a, String b) {
+			return Long.toString(Long.valueOf(a) + Long.valueOf(b));
 		}
 	}
 
-	private static final class MutableString {
-		String value = "0";
-	}
-
 	/**
 	 * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
index 69b2f61..2e05f61 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
@@ -36,11 +36,11 @@ import static org.junit.Assert.assertEquals;
  */
 public class ImmutableAggregatingStateTest {
 
-	private final AggregatingStateDescriptor<Long, MutableString, String> aggrStateDesc =
+	private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc =
 			new AggregatingStateDescriptor<>(
 					"test",
 					new SumAggr(),
-					MutableString.class);
+					String.class);
 
 	private ImmutableAggregatingState<Long, String> aggrState;
 
@@ -50,8 +50,7 @@ public class ImmutableAggregatingStateTest {
 			aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
 		}
 
-		final MutableString initValue = new MutableString();
-		initValue.value = "42";
+		final String initValue = "42";
 
 		ByteArrayOutputStream out = new ByteArrayOutputStream();
 		aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out));
@@ -81,34 +80,29 @@ public class ImmutableAggregatingStateTest {
 	/**
 	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
 	 */
-	private static class SumAggr implements AggregateFunction<Long, MutableString, String> {
+	private static class SumAggr implements AggregateFunction<Long, String, String> {
 
 		private static final long serialVersionUID = -6249227626701264599L;
 
 		@Override
-		public MutableString createAccumulator() {
-			return new MutableString();
+		public String createAccumulator() {
+			return "";
 		}
 
 		@Override
-		public void add(Long value, MutableString accumulator) {
-			accumulator.value += ", " + value;
+		public String add(Long value, String accumulator) {
+			accumulator += ", " + value;
+			return accumulator;
 		}
 
 		@Override
-		public String getResult(MutableString accumulator) {
-			return accumulator.value;
+		public String getResult(String accumulator) {
+			return accumulator;
 		}
 
 		@Override
-		public MutableString merge(MutableString a, MutableString b) {
-			MutableString nValue = new MutableString();
-			nValue.value = a.value + ", " + b.value;
-			return nValue;
+		public String merge(String a, String b) {
+			return a + ", " + b;
 		}
 	}
-
-	private static final class MutableString {
-		String value;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 64fc1db..3fa8cd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -114,8 +114,7 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 			if (accumulator == null) {
 				accumulator = aggFunction.createAccumulator();
 			}
-			aggFunction.add(value, accumulator);
-			return accumulator;
+			return aggFunction.add(value, accumulator);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 50b5e26..8f803ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1713,10 +1713,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testAggregatingStateAddAndGet() throws Exception {
+	public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
 
 		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-			new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+			new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
 		
 		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
 
@@ -1768,10 +1768,183 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testAggregatingStateMerging() throws Exception {
+	public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
 
 		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-			new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+			new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			InternalAggregatingState<Integer, Long, Long> state =
+				(InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
+
+		final AggregatingStateDescriptor<Long, Long, Long> stateDescr =
+			new AggregatingStateDescriptor<>("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			AggregatingState<Long, Long> state =
+				keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
+
+		final AggregatingStateDescriptor<Long, Long, Long> stateDescr =
+			new AggregatingStateDescriptor<>("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
 
 		final Integer namespace1 = 1;
 		final Integer namespace2 = 2;
@@ -3450,7 +3623,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@SuppressWarnings("serial")
-	private static class AggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+	private static class MutableAggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
 
 		@Override
 		public MutableLong createAccumulator() {
@@ -3458,8 +3631,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		@Override
-		public void add(Long value, MutableLong accumulator) {
+		public MutableLong add(Long value, MutableLong accumulator) {
 			accumulator.value += value;
+			return accumulator;
 		}
 
 		@Override
@@ -3474,6 +3648,30 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
+	@SuppressWarnings("serial")
+	private static class ImmutableAggregatingAddingFunction implements AggregateFunction<Long, Long, Long> {
+
+		@Override
+		public Long createAccumulator() {
+			return 0L;
+		}
+
+		@Override
+		public Long add(Long value, Long accumulator) {
+			return accumulator += value;
+		}
+
+		@Override
+		public Long getResult(Long accumulator) {
+			return accumulator;
+		}
+
+		@Override
+		public Long merge(Long a, Long b) {
+			return a + b;
+		}
+	}
+
 	private static final class MutableLong {
 		long value;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
index e20b878..94f752a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
@@ -56,10 +56,10 @@ public class AggregateApplyAllWindowFunction<W extends Window, T, ACC, V, R>
 
 	@Override
 	public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T value : values) {
-			aggFunction.add(value, acc);
+			acc = aggFunction.add(value, acc);
 		}
 
 		wrappedFunction.apply(window, Collections.singletonList(aggFunction.getResult(acc)), out);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
index 6d2d7f4..cdaa2b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
@@ -54,10 +54,10 @@ public class AggregateApplyWindowFunction<K, W extends Window, T, ACC, V, R>
 
 	@Override
 	public void apply(K key, W window, Iterable<T> values, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T val : values) {
-			aggFunction.add(val, acc);
+			acc = aggFunction.add(val, acc);
 		}
 
 		wrappedFunction.apply(key, window, Collections.singletonList(aggFunction.getResult(acc)), out);

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 3d58156..e17412a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -65,10 +65,10 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 
 	@Override
 	public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
-			aggFunction.add(val, acc);
+			acc = aggFunction.add(val, acc);
 		}
 
 		this.ctx.window = window;

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index e2dfe3f..c46fa55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -59,10 +59,10 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
 
 	@Override
 	public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
-		final ACC acc = aggFunction.createAccumulator();
+		ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
-			aggFunction.add(val, acc);
+			acc = aggFunction.add(val, acc);
 		}
 
 		this.ctx.window = window;

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 7657ce7..dc5b24c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -457,8 +457,9 @@ public class InternalWindowFunctionTest {
 					}
 
 					@Override
-					public void add(Long value, Set<Long> accumulator) {
+					public Set<Long> add(Long value, Set<Long> accumulator) {
 						accumulator.add(value);
+						return accumulator;
 					}
 
 					@Override
@@ -552,8 +553,9 @@ public class InternalWindowFunctionTest {
 					}
 
 					@Override
-					public void add(Long value, Set<Long> accumulator) {
+					public Set<Long> add(Long value, Set<Long> accumulator) {
 						accumulator.add(value);
+						return accumulator;
 					}
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index f967a5b..27963d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -1462,9 +1462,10 @@ public class AllWindowTranslationTest {
 		}
 
 		@Override
-		public void add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
+		public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
 			accumulator.f0 = value.f0;
 			accumulator.f1 = value.f1;
+			return accumulator;
 		}
 
 		@Override
@@ -1486,7 +1487,9 @@ public class AllWindowTranslationTest {
 		}
 
 		@Override
-		public void add(T value, T accumulator) {}
+		public T add(T value, T accumulator) {
+			return accumulator;
+		}
 
 		@Override
 		public T getResult(T accumulator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 821438e..a1276e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -1672,9 +1672,10 @@ public class WindowTranslationTest {
 		}
 
 		@Override
-		public void add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) {
+		public Tuple2<String, Integer> add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) {
 			accumulator.f0 = value.f0;
 			accumulator.f1 = value.f2;
+			return accumulator;
 		}
 
 		@Override
@@ -1696,7 +1697,9 @@ public class WindowTranslationTest {
 		}
 
 		@Override
-		public void add(T value, T accumulator) {}
+		public T add(T value, T accumulator) {
+			return accumulator;
+		}
 
 		@Override
 		public T getResult(T accumulator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index cc55f0d..916884f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -1988,7 +1988,7 @@ class DummyAggregator extends AggregateFunction[(String, Int), (String, Int), (S
 
   override def getResult(accumulator: (String, Int)): (String, Int) = accumulator
 
-  override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
+  override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = accumulator
 }
 
 class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String, Int), (String, Int)]
@@ -2000,7 +2000,7 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String,
 
   override def getResult(accumulator: (String, Int)): (String, Int) = accumulator
 
-  override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
+  override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = accumulator
 }
 
 class TestWindowFunction