You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/21 07:35:15 UTC
[2/2] flink git commit: [FLINK-7837] Extend AggregateFunction.add()
to work with immutable types
[FLINK-7837] Extend AggregateFunction.add() to work with immutable types
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f176c917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f176c917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f176c917
Branch: refs/heads/master
Commit: f176c917d56fa682c047930be9e579b635d7268b
Parents: ebc3bc1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 13 19:17:01 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Oct 21 09:33:38 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBAggregatingState.java | 4 +-
.../api/common/functions/AggregateFunction.java | 7 +-
.../common/functions/RichAggregateFunction.java | 2 +-
.../aggregate/AggregateAggFunction.scala | 3 +-
.../itcases/AbstractQueryableStateTestBase.java | 34 ++-
.../state/ImmutableAggregatingStateTest.java | 32 ++-
.../state/heap/HeapAggregatingState.java | 5 +-
.../runtime/state/StateBackendTestBase.java | 210 ++++++++++++++++++-
.../AggregateApplyAllWindowFunction.java | 4 +-
.../windowing/AggregateApplyWindowFunction.java | 4 +-
...ternalAggregateProcessAllWindowFunction.java | 4 +-
.../InternalAggregateProcessWindowFunction.java | 4 +-
.../functions/InternalWindowFunctionTest.java | 6 +-
.../windowing/AllWindowTranslationTest.java | 7 +-
.../windowing/WindowTranslationTest.java | 7 +-
.../api/scala/WindowTranslationTest.scala | 4 +-
16 files changed, 267 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 8fce21c..2c07814 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -116,12 +116,12 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
final byte[] valueBytes = backend.db.get(columnFamily, key);
// deserialize the current accumulator, or create a blank one
- final ACC accumulator = valueBytes == null ?
+ ACC accumulator = valueBytes == null ?
aggFunction.createAccumulator() :
valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
// aggregate the value into the accumulator
- aggFunction.add(value, accumulator);
+ accumulator = aggFunction.add(value, accumulator);
// serialize the new accumulator
final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
index 3c79396..78b8d94 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -127,12 +127,15 @@ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
ACC createAccumulator();
/**
- * Adds the given value to the given accumulator.
+ * Adds the given input value to the given accumulator, returning the
+ * new accumulator value.
+ *
+ * <p>For efficiency, the input accumulator may be modified and returned.
*
* @param value The value to add
* @param accumulator The accumulator to add the value to
*/
- void add(IN value, ACC accumulator);
+ ACC add(IN value, ACC accumulator);
/**
* Gets the result of the aggregation from the accumulator.
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
index caf2557..dbaf639 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichAggregateFunction.java
@@ -43,7 +43,7 @@ public abstract class RichAggregateFunction<IN, ACC, OUT>
public abstract ACC createAccumulator();
@Override
- public abstract void add(IN value, ACC accumulator);
+ public abstract ACC add(IN value, ACC accumulator);
@Override
public abstract OUT getResult(ACC accumulator);
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index d3bffda..330386b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -42,11 +42,12 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
function.createAccumulators()
}
- override def add(value: CRow, accumulatorRow: Row): Unit = {
+ override def add(value: CRow, accumulatorRow: Row): Row = {
if (function == null) {
initFunction()
}
function.accumulate(accumulatorRow, value.row)
+ accumulatorRow
}
override def getResult(accumulatorRow: Row): Row = {
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 6df77c0..4d27da2 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -1099,11 +1099,11 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));
- final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor =
+ final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
new AggregatingStateDescriptor<>(
"aggregates",
new SumAggr(),
- MutableString.class);
+ String.class);
aggrStateDescriptor.setQueryable("aggr-queryable");
source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
@@ -1291,10 +1291,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
private static final long serialVersionUID = 1L;
- private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor;
+ private final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDescriptor;
private transient AggregatingState<Tuple2<Integer, Long>, String> state;
- AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) {
+ AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDesc) {
this.stateDescriptor = stateDesc;
}
@@ -1316,39 +1316,33 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
/**
* Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
*/
- private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+ private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, String, String> {
private static final long serialVersionUID = -6249227626701264599L;
@Override
- public MutableString createAccumulator() {
- return new MutableString();
+ public String createAccumulator() {
+ return "0";
}
@Override
- public void add(Tuple2<Integer, Long> value, MutableString accumulator) {
- long acc = Long.valueOf(accumulator.value);
+ public String add(Tuple2<Integer, Long> value, String accumulator) {
+ long acc = Long.valueOf(accumulator);
acc += value.f1;
- accumulator.value = Long.toString(acc);
+ return Long.toString(acc);
}
@Override
- public String getResult(MutableString accumulator) {
- return accumulator.value;
+ public String getResult(String accumulator) {
+ return accumulator;
}
@Override
- public MutableString merge(MutableString a, MutableString b) {
- MutableString nValue = new MutableString();
- nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value));
- return nValue;
+ public String merge(String a, String b) {
+ return Long.toString(Long.valueOf(a) + Long.valueOf(b));
}
}
- private static final class MutableString {
- String value = "0";
- }
-
/**
* Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
index 69b2f61..2e05f61 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
@@ -36,11 +36,11 @@ import static org.junit.Assert.assertEquals;
*/
public class ImmutableAggregatingStateTest {
- private final AggregatingStateDescriptor<Long, MutableString, String> aggrStateDesc =
+ private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc =
new AggregatingStateDescriptor<>(
"test",
new SumAggr(),
- MutableString.class);
+ String.class);
private ImmutableAggregatingState<Long, String> aggrState;
@@ -50,8 +50,7 @@ public class ImmutableAggregatingStateTest {
aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
}
- final MutableString initValue = new MutableString();
- initValue.value = "42";
+ final String initValue = "42";
ByteArrayOutputStream out = new ByteArrayOutputStream();
aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out));
@@ -81,34 +80,29 @@ public class ImmutableAggregatingStateTest {
/**
* Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
*/
- private static class SumAggr implements AggregateFunction<Long, MutableString, String> {
+ private static class SumAggr implements AggregateFunction<Long, String, String> {
private static final long serialVersionUID = -6249227626701264599L;
@Override
- public MutableString createAccumulator() {
- return new MutableString();
+ public String createAccumulator() {
+ return "";
}
@Override
- public void add(Long value, MutableString accumulator) {
- accumulator.value += ", " + value;
+ public String add(Long value, String accumulator) {
+ accumulator += ", " + value;
+ return accumulator;
}
@Override
- public String getResult(MutableString accumulator) {
- return accumulator.value;
+ public String getResult(String accumulator) {
+ return accumulator;
}
@Override
- public MutableString merge(MutableString a, MutableString b) {
- MutableString nValue = new MutableString();
- nValue.value = a.value + ", " + b.value;
- return nValue;
+ public String merge(String a, String b) {
+ return a + ", " + b;
}
}
-
- private static final class MutableString {
- String value;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 64fc1db..3fa8cd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -114,8 +114,7 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
if (accumulator == null) {
accumulator = aggFunction.createAccumulator();
}
- aggFunction.add(value, accumulator);
- return accumulator;
+ return aggFunction.add(value, accumulator);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 50b5e26..8f803ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1713,10 +1713,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
- public void testAggregatingStateAddAndGet() throws Exception {
+ public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+ new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
@@ -1768,10 +1768,183 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
- public void testAggregatingStateMerging() throws Exception {
+ public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+ new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Long expectedResult = 165L;
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ InternalAggregatingState<Integer, Long, Long> state =
+ (InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
+
+ final AggregatingStateDescriptor<Long, Long, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ AggregatingState<Long, Long> state =
+ keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
+
+ final AggregatingStateDescriptor<Long, Long, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
final Integer namespace1 = 1;
final Integer namespace2 = 2;
@@ -3450,7 +3623,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@SuppressWarnings("serial")
- private static class AggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+ private static class MutableAggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
@Override
public MutableLong createAccumulator() {
@@ -3458,8 +3631,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Override
- public void add(Long value, MutableLong accumulator) {
+ public MutableLong add(Long value, MutableLong accumulator) {
accumulator.value += value;
+ return accumulator;
}
@Override
@@ -3474,6 +3648,30 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}
+ @SuppressWarnings("serial")
+ private static class ImmutableAggregatingAddingFunction implements AggregateFunction<Long, Long, Long> {
+
+ @Override
+ public Long createAccumulator() {
+ return 0L;
+ }
+
+ @Override
+ public Long add(Long value, Long accumulator) {
+ return accumulator += value;
+ }
+
+ @Override
+ public Long getResult(Long accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Long merge(Long a, Long b) {
+ return a + b;
+ }
+ }
+
private static final class MutableLong {
long value;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
index e20b878..94f752a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
@@ -56,10 +56,10 @@ public class AggregateApplyAllWindowFunction<W extends Window, T, ACC, V, R>
@Override
public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
- final ACC acc = aggFunction.createAccumulator();
+ ACC acc = aggFunction.createAccumulator();
for (T value : values) {
- aggFunction.add(value, acc);
+ acc = aggFunction.add(value, acc);
}
wrappedFunction.apply(window, Collections.singletonList(aggFunction.getResult(acc)), out);
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
index 6d2d7f4..cdaa2b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
@@ -54,10 +54,10 @@ public class AggregateApplyWindowFunction<K, W extends Window, T, ACC, V, R>
@Override
public void apply(K key, W window, Iterable<T> values, Collector<R> out) throws Exception {
- final ACC acc = aggFunction.createAccumulator();
+ ACC acc = aggFunction.createAccumulator();
for (T val : values) {
- aggFunction.add(val, acc);
+ acc = aggFunction.add(val, acc);
}
wrappedFunction.apply(key, window, Collections.singletonList(aggFunction.getResult(acc)), out);
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 3d58156..e17412a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -65,10 +65,10 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
@Override
public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
- final ACC acc = aggFunction.createAccumulator();
+ ACC acc = aggFunction.createAccumulator();
for (T val : input) {
- aggFunction.add(val, acc);
+ acc = aggFunction.add(val, acc);
}
this.ctx.window = window;
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index e2dfe3f..c46fa55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -59,10 +59,10 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
@Override
public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
- final ACC acc = aggFunction.createAccumulator();
+ ACC acc = aggFunction.createAccumulator();
for (T val : input) {
- aggFunction.add(val, acc);
+ acc = aggFunction.add(val, acc);
}
this.ctx.window = window;
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 7657ce7..dc5b24c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -457,8 +457,9 @@ public class InternalWindowFunctionTest {
}
@Override
- public void add(Long value, Set<Long> accumulator) {
+ public Set<Long> add(Long value, Set<Long> accumulator) {
accumulator.add(value);
+ return accumulator;
}
@Override
@@ -552,8 +553,9 @@ public class InternalWindowFunctionTest {
}
@Override
- public void add(Long value, Set<Long> accumulator) {
+ public Set<Long> add(Long value, Set<Long> accumulator) {
accumulator.add(value);
+ return accumulator;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index f967a5b..27963d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -1462,9 +1462,10 @@ public class AllWindowTranslationTest {
}
@Override
- public void add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
+ public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
accumulator.f0 = value.f0;
accumulator.f1 = value.f1;
+ return accumulator;
}
@Override
@@ -1486,7 +1487,9 @@ public class AllWindowTranslationTest {
}
@Override
- public void add(T value, T accumulator) {}
+ public T add(T value, T accumulator) {
+ return accumulator;
+ }
@Override
public T getResult(T accumulator) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 821438e..a1276e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -1672,9 +1672,10 @@ public class WindowTranslationTest {
}
@Override
- public void add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) {
+ public Tuple2<String, Integer> add(Tuple3<String, String, Integer> value, Tuple2<String, Integer> accumulator) {
accumulator.f0 = value.f0;
accumulator.f1 = value.f2;
+ return accumulator;
}
@Override
@@ -1696,7 +1697,9 @@ public class WindowTranslationTest {
}
@Override
- public void add(T value, T accumulator) {}
+ public T add(T value, T accumulator) {
+ return accumulator;
+ }
@Override
public T getResult(T accumulator) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f176c917/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index cc55f0d..916884f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -1988,7 +1988,7 @@ class DummyAggregator extends AggregateFunction[(String, Int), (String, Int), (S
override def getResult(accumulator: (String, Int)): (String, Int) = accumulator
- override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
+ override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = accumulator
}
class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String, Int), (String, Int)]
@@ -2000,7 +2000,7 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int), (String,
override def getResult(accumulator: (String, Int)): (String, Int) = accumulator
- override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
+ override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = accumulator
}
class TestWindowFunction