You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:15 UTC
[5/5] flink git commit: [FLINK-1619] [FLINK-1620] Grouped sliding
prereducers added for Time and Count
[FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time and Count
Closes #465
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2522f028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2522f028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2522f028
Branch: refs/heads/master
Commit: 2522f028bde9af57ba52904855265e6a8519e724
Parents: aacd4f2
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 9 13:43:32 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Mar 11 01:22:10 2015 +0100
----------------------------------------------------------------------
.../api/datastream/DataStreamSource.java | 3 +
.../api/datastream/WindowedDataStream.java | 50 ++--
.../streaming/api/windowing/WindowUtils.java | 7 +-
.../SlidingCountGroupedPreReducer.java | 86 ++++++
.../windowbuffer/SlidingGroupedPreReducer.java | 148 ++++++++++
.../windowbuffer/SlidingPreReducer.java | 31 +-
.../SlidingTimeGroupedPreReducer.java | 100 +++++++
.../windowing/WindowIntegrationTest.java | 155 +++++-----
.../SlidingCountGroupedPreReducerTest.java | 220 +++++++++++++++
.../SlidingTimeGroupedPreReducerTest.java | 280 +++++++++++++++++++
10 files changed, 979 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index b596cbd..0dda976 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -35,6 +35,9 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable, boolean isParallel) {
super(environment, operatorType, outTypeInfo, invokable);
this.isParallel = isParallel;
+ if (!isParallel) {
+ setParallelism(1);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 6ce9f9f..8199b22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -56,7 +56,9 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
@@ -337,8 +339,10 @@ public class WindowedDataStream<OUT> {
}
private int getDiscretizerParallelism() {
- return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
- || (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+ return isLocal
+ || WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
+ dataStream.getParallelism()) || (discretizerKey != null) ? dataStream.environment
+ .getDegreeOfParallelism() : 1;
}
private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
@@ -381,21 +385,35 @@ public class WindowedDataStream<OUT> {
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
.createSerializer(getExecutionConfig()));
}
- } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {
-
- return new SlidingCountPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- ((CountTriggerPolicy<?>) trigger).getStart());
-
- } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
+ } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
+ if (groupByKey == null) {
+ return new SlidingCountPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()),
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ ((CountTriggerPolicy<?>) trigger).getStart());
+ } else {
+ return new SlidingCountGroupedPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()), groupByKey,
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ ((CountTriggerPolicy<?>) trigger).getStart());
+ }
- return new SlidingTimePreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- WindowUtils.getTimeStampWrapper(trigger));
+ } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
+ if (groupByKey == null) {
+ return new SlidingTimePreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ .createSerializer(getExecutionConfig()),
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ WindowUtils.getTimeStampWrapper(trigger));
+ } else {
+ return new SlidingTimeGroupedPreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ .createSerializer(getExecutionConfig()), groupByKey,
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ WindowUtils.getTimeStampWrapper(trigger));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 246aff2..0649b4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -44,9 +44,10 @@ public class WindowUtils {
}
}
- public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
- || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+ public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
+ int inputParallelism) {
+ return inputParallelism != 1
+ && ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
}
public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
new file mode 100644
index 0000000..48bf1b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private long windowSize;
+ private long slideSize;
+ private int start;
+
+ protected long index = 0;
+
+ public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
+ super(reducer, serializer, key);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ this.start = start;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ index = index - start;
+ }
+
+ @Override
+ protected void afterStore() {
+ index++;
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ if (index >= 0) {
+ super.store(element);
+ } else {
+ index++;
+ }
+ }
+
+ @Override
+ protected boolean currentEligible(T next) {
+ if (index <= slideSize) {
+ return true;
+ } else {
+ return index == windowSize;
+ }
+ }
+
+ @Override
+ protected void afterEmit() {
+ if (index >= slideSize) {
+ index = index - slideSize;
+ }
+ }
+
+ @Override
+ public SlidingCountGroupedPreReducer<T> clone() {
+ return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
+ slideSize, start);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
new file mode 100644
index 0000000..aa1d76c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
+ protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();
+
+ protected KeySelector<T, ?> key;
+
+ public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ KeySelector<T, ?> key) {
+ super(reducer, serializer);
+ this.key = key;
+ }
+
+ public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
+ Map<Object, T> finalReduce = null;
+
+ if (!reducedMap.isEmpty()) {
+ finalReduce = reducedMap.get(0);
+ for (int i = 1; i < reducedMap.size(); i++) {
+ finalReduce = reduceMaps(finalReduce, reducedMap.get(i));
+
+ }
+ if (currentReducedMap != null) {
+ finalReduce = reduceMaps(finalReduce, currentReducedMap);
+ }
+
+ } else {
+ finalReduce = currentReducedMap;
+ }
+
+ if (finalReduce != null) {
+ currentWindow.addAll(finalReduce.values());
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+ private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {
+
+ Map<Object, T> reduced = new HashMap<Object, T>();
+
+ // Get the common keys in the maps
+ Set<Object> interSection = new HashSet<Object>();
+ Set<Object> diffFirst = new HashSet<Object>();
+ Set<Object> diffSecond = new HashSet<Object>();
+
+ for (Object key : first.keySet()) {
+ if (second.containsKey(key)) {
+ interSection.add(key);
+ } else {
+ diffFirst.add(key);
+ }
+ }
+
+ for (Object key : second.keySet()) {
+ if (!interSection.contains(key)) {
+ diffSecond.add(key);
+ }
+ }
+
+ // Reduce the common keys
+ for (Object key : interSection) {
+ reduced.put(
+ key,
+ reducer.reduce(serializer.copy(first.get(key)),
+ serializer.copy(second.get(key))));
+ }
+
+ for (Object key : diffFirst) {
+ reduced.put(key, first.get(key));
+ }
+
+ for (Object key : diffSecond) {
+ reduced.put(key, second.get(key));
+ }
+
+ return reduced;
+ }
+
+ protected void updateCurrent(T element) throws Exception {
+ if (currentReducedMap == null) {
+ currentReducedMap = new HashMap<Object, T>();
+ currentReducedMap.put(key.getKey(element), element);
+ } else {
+ Object nextKey = key.getKey(element);
+ T last = currentReducedMap.get(nextKey);
+ if (last == null) {
+ currentReducedMap.put(nextKey, element);
+ } else {
+ currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
+ }
+ }
+ }
+
+ @Override
+ protected void removeLastReduced() {
+ reducedMap.removeFirst();
+ }
+
+ @Override
+ protected void addCurrentToBuffer(T element) throws Exception {
+ reducedMap.add(currentReducedMap);
+ }
+
+ @Override
+ protected void resetCurrent() {
+ currentReducedMap = null;
+ }
+
+ @Override
+ protected boolean currentNotEmpty() {
+ return currentReducedMap != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 1dd126d..27f7ff5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -103,15 +103,23 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
}
protected void addToBufferIfEligible(T element) throws Exception {
- if (currentEligible(element) && currentReduced != null) {
+ if (currentEligible(element) && currentNotEmpty()) {
addCurrentToBuffer(element);
elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
- elementsSinceLastPreAggregate = 1;
- } else {
- updateCurrent(element);
-
- elementsSinceLastPreAggregate++;
+ elementsSinceLastPreAggregate = 0;
+ resetCurrent();
}
+ updateCurrent(element);
+
+ elementsSinceLastPreAggregate++;
+ }
+
+ protected void resetCurrent() {
+ currentReduced = null;
+ }
+
+ protected boolean currentNotEmpty() {
+ return currentReduced != null;
}
protected void updateCurrent(T element) throws Exception {
@@ -122,9 +130,8 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
}
}
- protected void addCurrentToBuffer(T element) {
+ protected void addCurrentToBuffer(T element) throws Exception {
reduced.add(currentReduced);
- currentReduced = element;
}
protected abstract boolean currentEligible(T next);
@@ -135,7 +142,7 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
- reduced.removeFirst();
+ removeLastReduced();
lastPreAggregateSize = elementsPerPreAggregate.peek();
}
@@ -144,7 +151,11 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
}
}
- public int max(int a, int b) {
+ protected void removeLastReduced() {
+ reduced.removeFirst();
+ }
+
+ public static int max(int a, int b) {
if (a > b) {
return a;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
new file mode 100644
index 0000000..1c293af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private long windowSize;
+ private long slideSize;
+ private TimestampWrapper<T> timestampWrapper;
+ private T lastStored;
+ protected long windowStartTime;
+
+ public SlidingTimeGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ KeySelector<T, ?> key, long windowSize, long slideSize,
+ TimestampWrapper<T> timestampWrapper) {
+ super(reducer, serializer, key);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ this.timestampWrapper = timestampWrapper;
+ this.windowStartTime = timestampWrapper.getStartTime();
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ super.store(element);
+ lastStored = element;
+ }
+
+ @Override
+ public SlidingTimeGroupedPreReducer<T> clone() {
+ return new SlidingTimeGroupedPreReducer<T>(reducer, serializer, key, windowSize, slideSize,
+ timestampWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return currentReducedMap.toString();
+ }
+
+ @Override
+ protected void afterEmit() {
+ long lastTime = timestampWrapper.getTimestamp(lastStored);
+ if (lastTime - windowStartTime >= slideSize) {
+ windowStartTime = windowStartTime + slideSize;
+ }
+ }
+
+ @Override
+ public void evict(int n) {
+ toRemove += n;
+ Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+ while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+ toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+ removeLastReduced();
+ lastPreAggregateSize = elementsPerPreAggregate.peek();
+ }
+
+ if (toRemove > 0 && lastPreAggregateSize == null) {
+ resetCurrent();
+ toRemove = 0;
+ }
+ }
+
+ @Override
+ protected boolean currentEligible(T next) {
+ return windowStartTime == timestampWrapper.getStartTime()
+ || timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index bcfa188..d7338a0 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -44,7 +44,7 @@ public class WindowIntegrationTest implements Serializable {
private static final Integer MEMORYSIZE = 32;
@SuppressWarnings("serial")
- private static class ModKey implements KeySelector<Integer, Integer> {
+ public static class ModKey implements KeySelector<Integer, Integer> {
private int m;
public ModKey(int m) {
@@ -126,82 +126,93 @@ public class WindowIntegrationTest implements Serializable {
source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4());
- source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new DistributedSink5());
+ source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+ .getDiscretizedStream().addSink(new DistributedSink5());
env.execute();
- // sum ( Time of 3 slide 2 )
- List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(5));
- expected1.add(StreamWindow.fromElements(11));
- expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(10));
- expected1.add(StreamWindow.fromElements(32));
-
- validateOutput(expected1, CentralSink1.windows);
-
- // Tumbling Time of 4 grouped by mod 2
- List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2, 4));
- expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(5));
- expected2.add(StreamWindow.fromElements(10));
- expected2.add(StreamWindow.fromElements(11, 11));
-
- validateOutput(expected2, CentralSink2.windows);
-
- // groupby mod 2 sum ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
- expected3.add(StreamWindow.fromElements(4));
- expected3.add(StreamWindow.fromElements(5));
- expected3.add(StreamWindow.fromElements(22));
- expected3.add(StreamWindow.fromElements(8));
- expected3.add(StreamWindow.fromElements(10));
-
- validateOutput(expected3, DistributedSink1.windows);
-
- // groupby mod3 Tumbling Count of 2 grouped by mod 2
- List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
- expected4.add(StreamWindow.fromElements(2, 2));
- expected4.add(StreamWindow.fromElements(1));
- expected4.add(StreamWindow.fromElements(4));
- expected4.add(StreamWindow.fromElements(5, 11));
- expected4.add(StreamWindow.fromElements(10));
- expected4.add(StreamWindow.fromElements(11));
- expected4.add(StreamWindow.fromElements(3));
-
- validateOutput(expected4, DistributedSink2.windows);
-
- // min ( Time of 2 slide 3 )
- List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(1));
- expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(10));
-
- validateOutput(expected5, CentralSink3.windows);
-
- // groupby mod 2 max ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
- expected6.add(StreamWindow.fromElements(3));
- expected6.add(StreamWindow.fromElements(5));
- expected6.add(StreamWindow.fromElements(11));
- expected6.add(StreamWindow.fromElements(4));
- expected6.add(StreamWindow.fromElements(10));
-
- validateOutput(expected6, DistributedSink3.windows);
-
- List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
- expected7.add(StreamWindow.fromElements(10));
- expected7.add(StreamWindow.fromElements(10, 11, 11));
-
- validateOutput(expected7, DistributedSink4.windows);
+ // sum ( Time of 3 slide 2 )
+ List<StreamWindow<Integer>> expected1 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected1.add(StreamWindow.fromElements(5));
+ expected1.add(StreamWindow.fromElements(11));
+ expected1.add(StreamWindow.fromElements(9));
+ expected1.add(StreamWindow.fromElements(10));
+ expected1.add(StreamWindow.fromElements(32));
+
+ validateOutput(expected1, CentralSink1.windows);
+
+ // Tumbling Time of 4 grouped by mod 2
+ List<StreamWindow<Integer>> expected2 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected2.add(StreamWindow.fromElements(2, 2, 4));
+ expected2.add(StreamWindow.fromElements(1, 3));
+ expected2.add(StreamWindow.fromElements(5));
+ expected2.add(StreamWindow.fromElements(10));
+ expected2.add(StreamWindow.fromElements(11, 11));
+
+ validateOutput(expected2, CentralSink2.windows);
+
+ // groupby mod 2 sum ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected3 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected3.add(StreamWindow.fromElements(4));
+ expected3.add(StreamWindow.fromElements(5));
+ expected3.add(StreamWindow.fromElements(22));
+ expected3.add(StreamWindow.fromElements(8));
+ expected3.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected3, DistributedSink1.windows);
+
+ // groupby mod3 Tumbling Count of 2 grouped by mod 2
+ List<StreamWindow<Integer>> expected4 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected4.add(StreamWindow.fromElements(2, 2));
+ expected4.add(StreamWindow.fromElements(1));
+ expected4.add(StreamWindow.fromElements(4));
+ expected4.add(StreamWindow.fromElements(5, 11));
+ expected4.add(StreamWindow.fromElements(10));
+ expected4.add(StreamWindow.fromElements(11));
+ expected4.add(StreamWindow.fromElements(3));
+
+ validateOutput(expected4, DistributedSink2.windows);
+
+ // min ( Time of 2 slide 3 )
+ List<StreamWindow<Integer>> expected5 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected5.add(StreamWindow.fromElements(1));
+ expected5.add(StreamWindow.fromElements(4));
+ expected5.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected5, CentralSink3.windows);
+
+ // groupby mod 2 max ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected6 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected6.add(StreamWindow.fromElements(3));
+ expected6.add(StreamWindow.fromElements(5));
+ expected6.add(StreamWindow.fromElements(11));
+ expected6.add(StreamWindow.fromElements(4));
+ expected6.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected6, DistributedSink3.windows);
+
+ List<StreamWindow<Integer>> expected7 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+ expected7.add(StreamWindow.fromElements(10));
+ expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+ validateOutput(expected7, DistributedSink4.windows);
List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
- expected8.add(StreamWindow.fromElements(12));
- expected8.add(StreamWindow.fromElements(9));
- expected8.add(StreamWindow.fromElements(32));
+ expected8.add(StreamWindow.fromElements(4, 8));
+ expected8.add(StreamWindow.fromElements(4, 5));
+ expected8.add(StreamWindow.fromElements(10, 22));
+
+ for (List<Integer> sw : DistributedSink5.windows) {
+ Collections.sort(sw);
+ }
validateOutput(expected8, DistributedSink5.windows);
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
new file mode 100644
index 0000000..4e63f89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountGroupedPreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 3, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(3, 6));
+ expected.add(StreamWindow.fromElements(5, 10));
+ expected.add(StreamWindow.fromElements(7, 14));
+ expected.add(StreamWindow.fromElements(9, 18));
+ expected.add(StreamWindow.fromElements(11, 22));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 5, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(4, 6));
+ expected.add(StreamWindow.fromElements(12, 8));
+ expected.add(StreamWindow.fromElements(18, 12));
+ expected.add(StreamWindow.fromElements(24, 16));
+ expected.add(StreamWindow.fromElements(30, 20));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 6, 3, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(2, 4));
+ expected.add(StreamWindow.fromElements(9, 12));
+ expected.add(StreamWindow.fromElements(21, 18));
+ expected.add(StreamWindow.fromElements(30, 27));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 5, 1, 2);
+
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.emitWindow(collector);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(7);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1));
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(4, 2));
+ expected.add(StreamWindow.fromElements(4, 6));
+ expected.add(StreamWindow.fromElements(9, 6));
+ expected.add(StreamWindow.fromElements(8, 12));
+ expected.add(StreamWindow.fromElements(15, 10));
+ expected.add(StreamWindow.fromElements(12, 18));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
new file mode 100644
index 0000000..83ad7ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimeGroupedPreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(3, 6));
+ expected.add(StreamWindow.fromElements(5, 10));
+ expected.add(StreamWindow.fromElements(7, 14));
+ expected.add(StreamWindow.fromElements(9, 18));
+ expected.add(StreamWindow.fromElements(11, 22));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ protected static void checkResults(List<StreamWindow<Integer>> expected,
+ List<StreamWindow<Integer>> actual) {
+
+ for (StreamWindow<Integer> sw : expected) {
+ Collections.sort(sw);
+ }
+
+ for (StreamWindow<Integer> sw : actual) {
+ Collections.sort(sw);
+ }
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(4, 6));
+ expected.add(StreamWindow.fromElements(12, 8));
+ expected.add(StreamWindow.fromElements(18, 12));
+ expected.add(StreamWindow.fromElements(24, 16));
+ expected.add(StreamWindow.fromElements(30, 20));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(2, 4));
+ expected.add(StreamWindow.fromElements(9, 12));
+ expected.add(StreamWindow.fromElements(21, 18));
+ expected.add(StreamWindow.fromElements(30, 27));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(14);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.store(21);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+
+ preReducer.store(9);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(3, 6));
+ expected.add(StreamWindow.fromElements(5, 10));
+ expected.add(StreamWindow.fromElements(7, 14));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(21));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+}