You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/26 10:22:02 UTC
[2/2] flink git commit: [FLINK-2074] Fix erroneous emission of
Sliding Time PreReducer
[FLINK-2074] Fix erroneous emission of Sliding Time PreReducer
Before this, a sliding time window would keep emitting the last result
because the number of elements per pre-aggregation result was not
correctly reset on eviction.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6dbec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6dbec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6dbec
Branch: refs/heads/master
Commit: 6bc6dbec6878f58500370b2e6912ad5022c5bf78
Parents: 4de2353
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri May 22 16:30:58 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 26 10:21:31 2015 +0200
----------------------------------------------------------------------
.../windowbuffer/SlidingGroupedPreReducer.java | 1 +
.../windowbuffer/SlidingTimePreReducer.java | 1 +
.../SlidingTimeGroupedPreReducerTest.java | 168 +++++++++++++++----
.../windowbuffer/SlidingTimePreReducerTest.java | 116 ++++++++++---
4 files changed, 230 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
index 0872c6e..09fadf9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -143,6 +143,7 @@ public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
@Override
protected void resetCurrent() {
currentReducedMap = null;
+ elementsSinceLastPreAggregate = 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index 7652d81..d84505c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -89,6 +89,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
if (toRemove > 0 && lastPreAggregateSize == null) {
currentReduced = null;
+ elementsSinceLastPreAggregate = 0;
toRemove = 0;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 3438f42..18a4748 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -23,10 +23,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
@@ -37,61 +41,136 @@ import org.junit.Test;
public class SlidingTimeGroupedPreReducerTest {
TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+ TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
+
ReduceFunction<Integer> reducer = new SumReducer();
+ ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
+
KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+ KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
@Test
public void testPreReduce1() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ // This ensures that the buffer is properly cleared after a burst of elements by
+ // replaying the same sequence of elements with a later timestamp and expecting the same
+ // result.
- SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
- reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
- new Timestamp<Integer>() {
+ TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
- private static final long serialVersionUID = 1L;
+ SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer,
+ tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
+ private static final long serialVersionUID = 1L;
- preReducer.store(1);
- preReducer.store(2);
+ @Override
+ public long getTimestamp(Tuple2<Integer, Integer> value) {
+ return value.f0;
+ }
+ }, 1));
+
+ int timeOffset = 0;
+
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
preReducer.evict(1);
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(5);
- preReducer.store(6);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
preReducer.emitWindow(collector);
- preReducer.store(13);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(3, 6));
- expected.add(StreamWindow.fromElements(5, 10));
- expected.add(StreamWindow.fromElements(7, 14));
- expected.add(StreamWindow.fromElements(9, 18));
- expected.add(StreamWindow.fromElements(11, 22));
+ // ensure that everything is cleared out
+ preReducer.evict(100);
- checkResults(expected, collector.getCollected());
+
+ timeOffset = 25; // a little while later...
+
+ // Repeat the same sequence, this should produce the same result
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
+ preReducer.emitWindow(collector);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
+ preReducer.emitWindow(collector);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
+
+ List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
+ timeOffset = 0; // rewind ...
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 2, 2),
+ new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 2, 6),
+ new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 4, 10),
+ new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 6, 14),
+ new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 8, 18),
+ new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 10, 22),
+ new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
+
+ timeOffset = 25; // and back to the future ...
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 2, 2),
+ new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 2, 6),
+ new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 4, 10),
+ new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 6, 14),
+ new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 8, 18),
+ new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
+ expected.add(StreamWindow.fromElements(
+ new Tuple2<Integer, Integer>(timeOffset + 10, 22),
+ new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
+
+ assertEquals(expected, collector.getCollected());
}
protected static void checkResults(List<StreamWindow<Integer>> expected,
@@ -277,4 +356,31 @@ public class SlidingTimeGroupedPreReducerTest {
}
}
+
+ private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+ return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+ }
+
+ }
+
+ public static class TupleModKey implements KeySelector<Tuple2<Integer, Integer>, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int m;
+
+ public TupleModKey(int m) {
+ this.m = m;
+ }
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+ return value.f1 % m;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index bc3b13b..a48bc0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -22,9 +22,13 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
@@ -34,56 +38,106 @@ import org.junit.Test;
public class SlidingTimePreReducerTest {
TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+ TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
ReduceFunction<Integer> reducer = new SumReducer();
+ ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
@Test
public void testPreReduce1() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ // This ensures that the buffer is properly cleared after a burst of elements by
+ // replaying the same sequence of elements with a later timestamp and expecting the same
+ // result.
- SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
- serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+ TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+
+ SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
+ tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
- public long getTimestamp(Integer value) {
- return value;
+ public long getTimestamp(Tuple2<Integer, Integer> value) {
+ return value.f0;
}
}, 1));
- preReducer.store(1);
- preReducer.store(2);
+ int timeOffset = 0;
+
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
preReducer.evict(1);
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(5);
- preReducer.store(6);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
preReducer.emitWindow(collector);
preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
preReducer.emitWindow(collector);
- preReducer.store(13);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
+
+ // ensure that everything is cleared out
+ preReducer.evict(100);
+
+
+ timeOffset = 25; // a little while later...
+
+ // Repeat the same sequence, this should produce the same result
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
+ preReducer.emitWindow(collector);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
+ preReducer.emitWindow(collector);
+ preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
+
+ List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
+ timeOffset = 0; // rewind ...
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
+
+ timeOffset = 25; // and back to the future ...
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
+ expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(3));
- expected.add(StreamWindow.fromElements(9));
- expected.add(StreamWindow.fromElements(15));
- expected.add(StreamWindow.fromElements(21));
- expected.add(StreamWindow.fromElements(27));
- expected.add(StreamWindow.fromElements(33));
assertEquals(expected, collector.getCollected());
}
@@ -254,4 +308,16 @@ public class SlidingTimePreReducerTest {
}
}
+
+ private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+ return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+ }
+
+ }
+
}