You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:12 UTC
[2/5] flink git commit: [FLINK-1619] [FLINK-1620] Basic sliding
prereducers added for Time and Count
[FLINK-1619] [FLINK-1620] Basic sliding prereducers added for Time and Count
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70abc16d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70abc16d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70abc16d
Branch: refs/heads/master
Commit: 70abc16df78d47c62b7cc7f1545542b330562113
Parents: 2be00ac
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 4 18:22:13 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedDataStream.java | 51 +++-
.../windowing/policy/CountEvictionPolicy.java | 8 +
.../windowing/policy/CountTriggerPolicy.java | 4 +
.../windowing/policy/TimeEvictionPolicy.java | 8 +
.../api/windowing/policy/TimeTriggerPolicy.java | 4 +
.../windowbuffer/BasicWindowBuffer.java | 4 -
.../windowbuffer/SlidingCountPreReducer.java | 83 ++++++
.../windowbuffer/SlidingPreReducer.java | 141 ++++++++++
.../windowbuffer/SlidingTimePreReducer.java | 99 +++++++
.../windowbuffer/TumblingGroupedPreReducer.java | 7 -
.../windowbuffer/TumblingPreReducer.java | 7 -
.../windowing/windowbuffer/WindowBuffer.java | 2 -
.../windowing/WindowIntegrationTest.java | 37 ++-
.../windowbuffer/BasicWindowBufferTest.java | 4 -
.../SlidingCountPreReducerTest.java | 216 ++++++++++++++++
.../windowbuffer/SlidingTimePreReducerTest.java | 257 +++++++++++++++++++
.../TumblingGroupedPreReducerTest.java | 7 -
.../windowbuffer/TumblingPreReducerTest.java | 6 -
18 files changed, 894 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 742ff22..4c7da88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -46,15 +46,21 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
import org.apache.flink.streaming.api.windowing.WindowUtils;
import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
@@ -365,16 +371,41 @@ public class WindowedDataStream<OUT> {
@SuppressWarnings("unchecked")
private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
- if (transformation == WindowTransformation.REDUCEWINDOW
- && getEviction() instanceof TumblingEvictionPolicy) {
- if (groupByKey == null) {
- return new TumblingPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
- .createSerializer(getExecutionConfig()));
- } else {
- return new TumblingGroupedPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
- .createSerializer(getExecutionConfig()));
+ if (transformation == WindowTransformation.REDUCEWINDOW) {
+ if (getTrigger() instanceof TumblingEvictionPolicy) {
+ if (groupByKey == null) {
+ return new TumblingPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
+ .createSerializer(getExecutionConfig()));
+ } else {
+ return new TumblingGroupedPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey,
+ getType().createSerializer(getExecutionConfig()));
+ }
+ } else if (getTrigger() instanceof CountTriggerPolicy
+ && getEviction() instanceof CountEvictionPolicy && groupByKey == null) {
+
+ int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+ int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
+ int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
+ if (slide < window) {
+ return new SlidingCountPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()), window,
+ slide, start);
+ }
+ } else if (getTrigger() instanceof TimeTriggerPolicy
+ && getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
+ int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+ int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
+ TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
+ .getTimeStampWrapper();
+ if (slide < window) {
+ return new SlidingTimePreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()), window,
+ slide, wrapper);
+ }
}
}
return new BasicWindowBuffer<OUT>();
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 3ede27b..07478c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -136,6 +136,14 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
}
}
+ public int getWindowSize() {
+ return maxElements;
+ }
+
+ public int getStart() {
+ return startValue;
+ }
+
@Override
public String toString() {
return "CountPolicy(" + maxElements + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index 6d8149a..d439f72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -102,6 +102,10 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
}
}
+ public int getSlideSize() {
+ return max;
+ }
+
@Override
public String toString() {
return "CountPolicy(" + max + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index 982b6d5..ae17e29 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -150,10 +150,18 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
}
}
+ public long getWindowSize() {
+ return granularity;
+ }
+
@Override
public String toString() {
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+ ")";
}
+ public TimestampWrapper<DATA> getTimeStampWrapper() {
+ return timestampWrapper;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 7065582..ce8f16e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -213,6 +213,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
}
}
+ public long getSlideSize() {
+ return granularity;
+ }
+
@Override
public String toString() {
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 458de41..8e39398 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -63,10 +63,6 @@ public class BasicWindowBuffer<T> implements WindowBuffer<T> {
}
}
- public int size() {
- return buffer.size();
- }
-
@Override
public BasicWindowBuffer<T> clone() {
return new BasicWindowBuffer<T>();
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
new file mode 100644
index 0000000..ff358cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int windowSize;
+ private int slideSize;
+ private int start;
+
+ public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ int windowSize, int slideSize, int start) {
+ super(reducer, serializer);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ this.start = start;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ index = index - start;
+ }
+
+ @Override
+ public SlidingCountPreReducer<T> clone() {
+ return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ if (index >= 0) {
+ super.store(element);
+ } else {
+ index++;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return currentReduced.toString();
+ }
+
+ @Override
+ protected boolean addCurrentToReduce(T next) {
+ if (index <= slideSize) {
+ return true;
+ } else {
+ return index == windowSize;
+ }
+ }
+
+ @Override
+ protected void updateIndexAtEmit() {
+ if (index >= slideSize) {
+ index = index - slideSize;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
new file mode 100644
index 0000000..a21bb61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+
+ private static final long serialVersionUID = 1L;
+
+ protected ReduceFunction<T> reducer;
+
+ protected T currentReduced;
+ protected LinkedList<T> reduced = new LinkedList<T>();
+ protected LinkedList<Integer> elementsPerPreAggregate = new LinkedList<Integer>();
+
+ protected TypeSerializer<T> serializer;
+
+ protected int index = 0;
+ protected int toRemove = 0;
+
+ protected int elementsSinceLastPreAggregate = 0;
+
+ public SlidingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
+ this.reducer = reducer;
+ this.serializer = serializer;
+ }
+
+ public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+ StreamWindow<T> currentWindow = new StreamWindow<T>();
+ T finalAggregate = getFinalAggregate();
+ if (finalAggregate != null) {
+ currentWindow.add(finalAggregate);
+ collector.collect(currentWindow);
+ updateIndexAtEmit();
+ return true;
+ } else {
+ updateIndexAtEmit();
+ return false;
+ }
+
+ }
+
+ protected abstract void updateIndexAtEmit();
+
+ public T getFinalAggregate() {
+ try {
+ if (!reduced.isEmpty()) {
+ T finalReduce = reduced.get(0);
+ for (int i = 1; i < reduced.size(); i++) {
+ finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
+
+ }
+ if (currentReduced != null) {
+ finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
+ }
+ return finalReduce;
+ } else {
+ return currentReduced;
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void store(T element) throws Exception {
+ addToBufferIfEligible(element);
+ index++;
+ }
+
+ protected void addToBufferIfEligible(T element) throws Exception {
+ if (addCurrentToReduce(element) && currentReduced != null) {
+ reduced.add(currentReduced);
+ elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
+ currentReduced = element;
+ elementsSinceLastPreAggregate = 1;
+ } else {
+ if (currentReduced == null) {
+ currentReduced = element;
+ } else {
+ currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
+ }
+ elementsSinceLastPreAggregate++;
+ }
+ }
+
+ protected abstract boolean addCurrentToReduce(T next);
+
+ public void evict(int n) {
+ toRemove += n;
+
+ Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+ while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+ toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+ reduced.removeFirst();
+ lastPreAggregateSize = elementsPerPreAggregate.peek();
+ }
+
+ if (lastPreAggregateSize == null) {
+ toRemove = 0;
+ }
+ }
+
+ public int max(int a, int b) {
+ if (a > b) {
+ return a;
+ } else {
+ return b;
+ }
+ }
+
+ @Override
+ public abstract SlidingPreReducer<T> clone();
+
+ @Override
+ public String toString() {
+ return currentReduced.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
new file mode 100644
index 0000000..af2239c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int windowSize;
+ private int slideSize;
+ private TimestampWrapper<T> timestampWrapper;
+ private T lastStored;
+ protected long windowStartTime;
+
+ public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ int windowSize, int slideSize, TimestampWrapper<T> timestampWrapper) {
+ super(reducer, serializer);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ this.timestampWrapper = timestampWrapper;
+ this.windowStartTime = timestampWrapper.getStartTime();
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ super.store(element);
+ lastStored = element;
+ }
+
+ @Override
+ public SlidingTimePreReducer<T> clone() {
+ return new SlidingTimePreReducer<T>(reducer, serializer, windowSize, slideSize,
+ timestampWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return currentReduced.toString();
+ }
+
+ @Override
+ protected void updateIndexAtEmit() {
+ index = 0;
+ long lastTime = timestampWrapper.getTimestamp(lastStored);
+ if (lastTime - windowStartTime >= slideSize) {
+ windowStartTime = windowStartTime + slideSize;
+ }
+ }
+
+ @Override
+ public void evict(int n) {
+ toRemove += n;
+ Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+ while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+ toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+ reduced.removeFirst();
+ lastPreAggregateSize = elementsPerPreAggregate.peek();
+ }
+
+ if (toRemove > 0 && lastPreAggregateSize == null) {
+ currentReduced = null;
+ toRemove = 0;
+ }
+ }
+
+ @Override
+ protected boolean addCurrentToReduce(T next) {
+ return windowStartTime == timestampWrapper.getStartTime()
+ || timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 7403ffe..9431a99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -38,7 +38,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
private Map<Object, T> reducedValues;
- private int numOfElements = 0;
private TypeSerializer<T> serializer;
public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
@@ -56,7 +55,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
currentWindow.addAll(reducedValues.values());
collector.collect(currentWindow);
reducedValues.clear();
- numOfElements = 0;
return true;
} else {
return false;
@@ -76,16 +74,11 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
}
reducedValues.put(key, reduced);
- numOfElements++;
}
public void evict(int n) {
}
- public int size() {
- return numOfElements;
- }
-
@Override
public TumblingGroupedPreReducer<T> clone() {
return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index 35cf60e..58b30a6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -32,7 +32,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
private ReduceFunction<T> reducer;
private T reduced;
- private int numOfElements = 0;
private TypeSerializer<T> serializer;
public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
@@ -46,7 +45,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
currentWindow.add(reduced);
collector.collect(currentWindow);
reduced = null;
- numOfElements = 0;
return true;
} else {
return false;
@@ -59,16 +57,11 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
} else {
reduced = reducer.reduce(serializer.copy(reduced), element);
}
- numOfElements++;
}
public void evict(int n) {
}
- public int size() {
- return numOfElements;
- }
-
@Override
public TumblingPreReducer<T> clone() {
return new TumblingPreReducer<T>(reducer, serializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index e0429ab..2dd50db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -34,8 +34,6 @@ public interface WindowBuffer<T> extends Serializable, Cloneable {
public boolean emitWindow(Collector<StreamWindow<T>> collector);
- public int size();
-
public WindowBuffer<T> clone();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 25b5f87..bcfa188 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,6 +89,8 @@ public class WindowIntegrationTest implements Serializable {
inputs.add(11);
inputs.add(11);
+ KeySelector<Integer, ?> key = new ModKey(2);
+
Timestamp<Integer> ts = new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@@ -103,14 +105,12 @@ public class WindowIntegrationTest implements Serializable {
DataStream<Integer> source = env.fromCollection(inputs);
- source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+ source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new CentralSink1());
source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
.flatten().addSink(new CentralSink2());
- KeySelector<Integer, ?> key = new ModKey(2);
-
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink1());
@@ -126,12 +126,17 @@ public class WindowIntegrationTest implements Serializable {
source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4());
+ source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+ .addSink(new DistributedSink5());
+
env.execute();
- // sum ( Time of 2 slide 3 )
+ // sum ( Time of 3 slide 2 )
List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
expected1.add(StreamWindow.fromElements(5));
+ expected1.add(StreamWindow.fromElements(11));
expected1.add(StreamWindow.fromElements(9));
+ expected1.add(StreamWindow.fromElements(10));
expected1.add(StreamWindow.fromElements(32));
validateOutput(expected1, CentralSink1.windows);
@@ -193,6 +198,13 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected7, DistributedSink4.windows);
+ List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
+ expected8.add(StreamWindow.fromElements(12));
+ expected8.add(StreamWindow.fromElements(9));
+ expected8.add(StreamWindow.fromElements(32));
+
+ validateOutput(expected8, DistributedSink5.windows);
+
}
public static <R> void validateOutput(List<R> expected, List<R> actual) {
@@ -317,4 +329,21 @@ public class WindowIntegrationTest implements Serializable {
}
}
+
+ @SuppressWarnings("serial")
+ private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index bb756c7..967c719 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -41,8 +41,6 @@ public class BasicWindowBufferTest {
wb.store(2);
wb.store(10);
- assertEquals(2, wb.size());
-
wb.emitWindow(collector);
assertEquals(1, collected.size());
@@ -51,8 +49,6 @@ public class BasicWindowBufferTest {
wb.store(4);
wb.evict(2);
- assertEquals(1, wb.size());
-
wb.emitWindow(collector);
assertEquals(2, collected.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
new file mode 100644
index 0000000..3ce65f1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountPreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 3, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(9));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(27));
+ expected.add(StreamWindow.fromElements(33));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 5, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(10));
+ expected.add(StreamWindow.fromElements(20));
+ expected.add(StreamWindow.fromElements(30));
+ expected.add(StreamWindow.fromElements(40));
+ expected.add(StreamWindow.fromElements(50));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 6, 3, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(6));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(39));
+ expected.add(StreamWindow.fromElements(57));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 5, 1, 2);
+
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.emitWindow(collector);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(7);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1));
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(6));
+ expected.add(StreamWindow.fromElements(10));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(20));
+ expected.add(StreamWindow.fromElements(25));
+ expected.add(StreamWindow.fromElements(30));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
new file mode 100644
index 0000000..bc3b13b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimePreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(9));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(27));
+ expected.add(StreamWindow.fromElements(33));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(10));
+ expected.add(StreamWindow.fromElements(20));
+ expected.add(StreamWindow.fromElements(30));
+ expected.add(StreamWindow.fromElements(40));
+ expected.add(StreamWindow.fromElements(50));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(6));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(39));
+ expected.add(StreamWindow.fromElements(57));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(14);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.store(21);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+
+ preReducer.store(9);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(9));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(21));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index 437fcd6..95aace0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -67,9 +67,6 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
-
- assertEquals(2, wb.size());
-
wb.emitWindow(collector);
assertEquals(1, collected.size());
@@ -77,9 +74,6 @@ public class TumblingGroupedPreReducerTest {
assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
- // Test automatic eviction
- assertEquals(0, wb.size());
-
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
@@ -89,7 +83,6 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(3)));
- assertEquals(4, wb.size());
wb.emitWindow(collector);
assertEquals(2, collected.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index 31be227..ddaf747 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -58,17 +58,12 @@ public class TumblingPreReducerTest {
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
- assertEquals(2, wb.size());
-
wb.emitWindow(collector);
assertEquals(1, collected.size());
assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
collected.get(0));
- // Test automatic eviction
- assertEquals(0, wb.size());
-
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
@@ -78,7 +73,6 @@ public class TumblingPreReducerTest {
wb.store(serializer.copy(inputs.get(3)));
- assertEquals(4, wb.size());
wb.emitWindow(collector);
assertEquals(2, collected.size());