You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:11 UTC
[1/5] flink git commit: [FLINK-1643] [streaming] Auto detection of
tumbling policies added + WindowedDataStream refactor
Repository: flink
Updated Branches:
refs/heads/master 2bba2b3f0 -> 2522f028b
[FLINK-1643] [streaming] Auto detection of tumbling policies added + WindowedDataStream refactor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aacd4f29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aacd4f29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aacd4f29
Branch: refs/heads/master
Commit: aacd4f2916e6d7cc79b0891609d6485b1ec72ef2
Parents: 70abc16
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Mar 8 14:29:30 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedDataStream.java | 57 +++++-------
.../windowing/ParallelWindowPartitioner.java | 84 +++++++++++++++++
.../windowing/WindowBufferInvokable.java | 3 -
.../streaming/api/windowing/StreamWindow.java | 4 +
.../streaming/api/windowing/WindowUtils.java | 93 +++++++++++++++++++
.../streaming/api/windowing/helper/Time.java | 17 +---
.../windowing/policy/CountEvictionPolicy.java | 4 +
.../windowing/policy/CountTriggerPolicy.java | 4 +
.../api/windowing/policy/TimeTriggerPolicy.java | 39 ++------
.../windowbuffer/SlidingCountPreReducer.java | 25 ++---
.../windowbuffer/SlidingPreReducer.java | 96 ++++++++++++--------
.../windowbuffer/SlidingTimePreReducer.java | 11 +--
.../windowing/StreamDiscretizerTest.java | 2 +-
13 files changed, 300 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 4c7da88..6ce9f9f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -46,14 +46,11 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
import org.apache.flink.streaming.api.windowing.WindowUtils;
import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
@@ -261,7 +258,7 @@ public class WindowedDataStream<OUT> {
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
- .with(reduceFunction);
+ .with(clean(reduceFunction));
WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
@@ -288,7 +285,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream
*/
public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
- return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
+ return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction);
}
@@ -370,42 +367,36 @@ public class WindowedDataStream<OUT> {
@SuppressWarnings("unchecked")
private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
+ TriggerPolicy<OUT> trigger = getTrigger();
+ EvictionPolicy<OUT> eviction = getEviction();
if (transformation == WindowTransformation.REDUCEWINDOW) {
- if (getTrigger() instanceof TumblingEvictionPolicy) {
+ if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
if (groupByKey == null) {
return new TumblingPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
+ (ReduceFunction<OUT>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()));
} else {
return new TumblingGroupedPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey,
- getType().createSerializer(getExecutionConfig()));
- }
- } else if (getTrigger() instanceof CountTriggerPolicy
- && getEviction() instanceof CountEvictionPolicy && groupByKey == null) {
-
- int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
- int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
- int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
- if (slide < window) {
- return new SlidingCountPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
- .getType().createSerializer(getExecutionConfig()), window,
- slide, start);
- }
- } else if (getTrigger() instanceof TimeTriggerPolicy
- && getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
- int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
- int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
- TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
- .getTimeStampWrapper();
- if (slide < window) {
- return new SlidingTimePreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
- .getType().createSerializer(getExecutionConfig()), window,
- slide, wrapper);
+ (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+ .createSerializer(getExecutionConfig()));
}
+ } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {
+
+ return new SlidingCountPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
+ .createSerializer(getExecutionConfig()),
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ ((CountTriggerPolicy<?>) trigger).getStart());
+
+ } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
+
+ return new SlidingTimePreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ .createSerializer(getExecutionConfig()),
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ WindowUtils.getTimeStampWrapper(trigger));
+
}
}
return new BasicWindowBuffer<OUT>();
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
new file mode 100644
index 0000000..32778da
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+/**
+ * This invokable applies either split or key partitioning depending on the
+ * transformation.
+ */
+public class ParallelWindowPartitioner<T> extends
+ ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
+
+ private KeySelector<T, ?> keySelector;
+ private int numberOfSplits;
+ private int currentWindowID = 0;
+
+ public ParallelWindowPartitioner(KeySelector<T, ?> keySelector) {
+ super(null);
+ this.keySelector = keySelector;
+ }
+
+ public ParallelWindowPartitioner(int numberOfSplits) {
+ super(null);
+ this.numberOfSplits = numberOfSplits;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ StreamWindow<T> currentWindow = nextObject;
+ currentWindow.setID(++currentWindowID);
+
+ if (keySelector == null) {
+ if (numberOfSplits <= 1) {
+ collector.collect(currentWindow);
+ } else {
+ for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
+ collector.collect(window);
+ }
+ }
+ } else {
+
+ for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
+ collector.collect(window);
+ }
+
+ }
+ }
+
+ @Override
+ public void collect(StreamWindow<T> record) {
+ if (isRunning) {
+ nextObject = record;
+ callUserFunctionAndLogException();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 72be823..75f7d9d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -52,13 +52,10 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
throws Exception {
if (windowEvent.isElement()) {
- System.out.println("element: " + windowEvent.getElement());
buffer.store(windowEvent.getElement());
} else if (windowEvent.isEviction()) {
- System.out.println("eviction: " + windowEvent.getEviction());
buffer.evict(windowEvent.getEviction());
} else if (windowEvent.isTrigger()) {
- System.out.println("trigger");
buffer.emitWindow(collector);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index 49b40f5..b45babb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -172,6 +172,10 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
return this;
}
+ public void setID(int id) {
+ this.windowID = id;
+ }
+
/**
* Checks whether this window can be merged with the given one.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 1e3ba86..246aff2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -48,10 +49,102 @@ public class WindowUtils {
|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
}
+ public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ if (isTimeOnly(trigger, eviction)) {
+ long slide = getSlideSize(trigger);
+ long window = getWindowSize(eviction);
+
+ return slide < window
+ && getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
+ } else {
+ return false;
+ }
+ }
+
+ public static boolean isSlidingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ if (isCountOnly(trigger, eviction)) {
+ long slide = getSlideSize(trigger);
+ long window = getWindowSize(eviction);
+
+ return slide < window
+ && ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
+ .getStart()
+ && ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
+ } else {
+ return false;
+ }
+ }
+
+ public static <X> TimestampWrapper<X> getTimeStampWrapper(TriggerPolicy<X> trigger) {
+ if (trigger instanceof TimeTriggerPolicy) {
+ return ((TimeTriggerPolicy<X>) trigger).getTimeStampWrapper();
+ } else {
+ throw new IllegalArgumentException(
+ "Timestamp wrapper can only be accessed for time policies");
+ }
+ }
+
+ public static <X> TimestampWrapper<X> getTimeStampWrapper(EvictionPolicy<X> eviction) {
+ if (eviction instanceof EvictionPolicy) {
+ return ((TimeEvictionPolicy<X>) eviction).getTimeStampWrapper();
+ } else {
+ throw new IllegalArgumentException(
+ "Timestamp wrapper can only be accessed for time policies");
+ }
+ }
+
+ public static long getSlideSize(TriggerPolicy<?> trigger) {
+ if (trigger instanceof TimeTriggerPolicy) {
+ return ((TimeTriggerPolicy<?>) trigger).getSlideSize();
+ } else if (trigger instanceof CountTriggerPolicy) {
+ return ((CountTriggerPolicy<?>) trigger).getSlideSize();
+ } else {
+ throw new IllegalArgumentException(
+ "Slide size can only be accessed for time or count policies");
+ }
+ }
+
+ public static long getWindowSize(EvictionPolicy<?> eviction) {
+ if (eviction instanceof TimeEvictionPolicy) {
+ return ((TimeEvictionPolicy<?>) eviction).getWindowSize();
+ } else if (eviction instanceof CountEvictionPolicy) {
+ return ((CountEvictionPolicy<?>) eviction).getWindowSize();
+ } else {
+ throw new IllegalArgumentException(
+ "Window size can only be accessed for time or count policies");
+ }
+ }
+
+ public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ if (eviction instanceof TumblingEvictionPolicy) {
+ return true;
+ } else if (isTimeOnly(trigger, eviction)) {
+ long slide = getSlideSize(trigger);
+ long window = getWindowSize(eviction);
+
+ return slide == window
+ && getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
+ } else if (isCountOnly(trigger, eviction)) {
+ long slide = getSlideSize(trigger);
+ long window = getWindowSize(eviction);
+
+ return slide == window
+ && ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
+ .getStart()
+ && ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
+ } else {
+ return false;
+ }
+ }
+
public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
}
+ public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ return trigger instanceof CountTriggerPolicy && eviction instanceof CountEvictionPolicy;
+ }
+
public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
return trigger instanceof TimeTriggerPolicy
&& ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index f94eea4..da8d929 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -80,7 +80,6 @@ public class Time<DATA> implements WindowingHelper<DATA> {
this.length = length;
this.granularity = timeUnit;
this.timestampWrapper = timestampWrapper;
- this.delay = 0;
}
@Override
@@ -90,7 +89,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
@Override
public TriggerPolicy<DATA> toTrigger() {
- return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay);
+ return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper);
}
/**
@@ -147,19 +146,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
return of(length, timestamp, 0);
}
-
- /**
- * Sets the delay for the first processed window.
- *
- * @param delay
- * The number of time units before the first processed window.
- * @return Helper representing the time based trigger and eviction policy
- */
- public Time<DATA> withDelay(long delay) {
- this.delay = delay;
- return this;
- }
-
+
protected long granularityInMillis() {
return granularity == null ? length : granularity.toMillis(length);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 07478c6..5e0c862 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -143,6 +143,10 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
public int getStart() {
return startValue;
}
+
+ public int getDeleteOnEviction(){
+ return deleteOnEviction;
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index d439f72..9bd6f82 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -105,6 +105,10 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
public int getSlideSize() {
return max;
}
+
+ public int getStart() {
+ return startValue;
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index ce8f16e..4c8a942 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -43,30 +43,6 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
protected long startTime;
public long granularity;
public TimestampWrapper<DATA> timestampWrapper;
- protected long delay;
-
- /**
- * This trigger policy triggers with regard to the time. The is measured
- * using a given {@link Timestamp} implementation. A point in time is always
- * represented as long. Therefore, parameters such as granularity can be set
- * as long value as well. If this value for the granularity is set to 2 for
- * example, the policy will trigger at every second point in time.
- *
- * @param granularity
- * The granularity of the trigger. If this value is set to x the
- * policy will trigger at every x-th time point
- * @param timestampWrapper
- * The {@link TimestampWrapper} to measure the time with. This
- * can be either user defined of provided by the API.
- * @param timeWrapper
- * This policy creates fake elements to not miss windows in case
- * no element arrived within the duration of the window. This
- * extractor should wrap a long into such an element of type
- * DATA.
- */
- public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
- this(granularity, timestampWrapper, 0);
- }
/**
* This is mostly the same as
@@ -81,21 +57,16 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* @param timestampWrapper
* The {@link TimestampWrapper} to measure the time with. This
* can be either user defined of provided by the API.
- * @param delay
- * A delay for the first trigger. If the start time given by the
- * timestamp is x, the delay is y, and the granularity is z, the
- * first trigger will happen at x+y+z.
* @param timeWrapper
* This policy creates fake elements to not miss windows in case
* no element arrived within the duration of the window. This
* extractor should wrap a long into such an element of type
* DATA.
*/
- public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper, long delay) {
- this.startTime = timestampWrapper.getStartTime() + delay;
+ public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
+ this.startTime = timestampWrapper.getStartTime();
this.timestampWrapper = timestampWrapper;
this.granularity = granularity;
- this.delay = delay;
}
/**
@@ -194,7 +165,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
@Override
public TimeTriggerPolicy<DATA> clone() {
- return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper, delay);
+ return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper);
}
@Override
@@ -222,5 +193,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+ ")";
}
+
+ public TimestampWrapper<DATA> getTimeStampWrapper() {
+ return timestampWrapper;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
index ff358cb..a1216f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -27,12 +27,14 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
private static final long serialVersionUID = 1L;
- private int windowSize;
- private int slideSize;
+ private long windowSize;
+ private long slideSize;
private int start;
+ protected long index = 0;
+
public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- int windowSize, int slideSize, int start) {
+ long windowSize, long slideSize, int start) {
super(reducer, serializer);
if (windowSize > slideSize) {
this.windowSize = windowSize;
@@ -46,8 +48,8 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
}
@Override
- public SlidingCountPreReducer<T> clone() {
- return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+ protected void afterStore() {
+ index++;
}
@Override
@@ -60,12 +62,7 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
}
@Override
- public String toString() {
- return currentReduced.toString();
- }
-
- @Override
- protected boolean addCurrentToReduce(T next) {
+ protected boolean currentEligible(T next) {
if (index <= slideSize) {
return true;
} else {
@@ -74,10 +71,14 @@ public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
}
@Override
- protected void updateIndexAtEmit() {
+ protected void afterEmit() {
if (index >= slideSize) {
index = index - slideSize;
}
}
+ @Override
+ public SlidingCountPreReducer<T> clone() {
+ return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index a21bb61..1dd126d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -36,7 +36,6 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
protected TypeSerializer<T> serializer;
- protected int index = 0;
protected int toRemove = 0;
protected int elementsSinceLastPreAggregate = 0;
@@ -48,64 +47,87 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
public boolean emitWindow(Collector<StreamWindow<T>> collector) {
StreamWindow<T> currentWindow = new StreamWindow<T>();
- T finalAggregate = getFinalAggregate();
- if (finalAggregate != null) {
- currentWindow.add(finalAggregate);
- collector.collect(currentWindow);
- updateIndexAtEmit();
- return true;
- } else {
- updateIndexAtEmit();
- return false;
+
+ try {
+ if (addFinalAggregate(currentWindow)) {
+ collector.collect(currentWindow);
+ afterEmit();
+ return true;
+ } else {
+ afterEmit();
+ return false;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
- protected abstract void updateIndexAtEmit();
+ protected void afterEmit() {
+ // Do nothing by default
+ }
- public T getFinalAggregate() {
- try {
- if (!reduced.isEmpty()) {
- T finalReduce = reduced.get(0);
- for (int i = 1; i < reduced.size(); i++) {
- finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
-
- }
- if (currentReduced != null) {
- finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
- }
- return finalReduce;
- } else {
- return currentReduced;
+ public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
+ T finalReduce = null;
+
+ if (!reduced.isEmpty()) {
+ finalReduce = reduced.get(0);
+ for (int i = 1; i < reduced.size(); i++) {
+ finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
+
+ }
+ if (currentReduced != null) {
+ finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } else {
+ finalReduce = currentReduced;
}
+
+ if (finalReduce != null) {
+ currentWindow.add(finalReduce);
+ return true;
+ } else {
+ return false;
+ }
+
}
public void store(T element) throws Exception {
addToBufferIfEligible(element);
- index++;
+ afterStore();
+ }
+
+ protected void afterStore() {
+ // Do nothing by default
}
protected void addToBufferIfEligible(T element) throws Exception {
- if (addCurrentToReduce(element) && currentReduced != null) {
- reduced.add(currentReduced);
+ if (currentEligible(element) && currentReduced != null) {
+ addCurrentToBuffer(element);
elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
- currentReduced = element;
elementsSinceLastPreAggregate = 1;
} else {
- if (currentReduced == null) {
- currentReduced = element;
- } else {
- currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
- }
+ updateCurrent(element);
+
elementsSinceLastPreAggregate++;
}
}
- protected abstract boolean addCurrentToReduce(T next);
+ protected void updateCurrent(T element) throws Exception {
+ if (currentReduced == null) {
+ currentReduced = element;
+ } else {
+ currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
+ }
+ }
+
+ protected void addCurrentToBuffer(T element) {
+ reduced.add(currentReduced);
+ currentReduced = element;
+ }
+
+ protected abstract boolean currentEligible(T next);
public void evict(int n) {
toRemove += n;
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index af2239c..bf3ec98 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -28,14 +28,14 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
private static final long serialVersionUID = 1L;
- private int windowSize;
- private int slideSize;
+ private long windowSize;
+ private long slideSize;
private TimestampWrapper<T> timestampWrapper;
private T lastStored;
protected long windowStartTime;
public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- int windowSize, int slideSize, TimestampWrapper<T> timestampWrapper) {
+ long windowSize, long slideSize, TimestampWrapper<T> timestampWrapper) {
super(reducer, serializer);
if (windowSize > slideSize) {
this.windowSize = windowSize;
@@ -66,8 +66,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
}
@Override
- protected void updateIndexAtEmit() {
- index = 0;
+ protected void afterEmit() {
long lastTime = timestampWrapper.getTimestamp(lastStored);
if (lastTime - windowStartTime >= slideSize) {
windowStartTime = windowStartTime + slideSize;
@@ -92,7 +91,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
}
@Override
- protected boolean addCurrentToReduce(T next) {
+ protected boolean currentEligible(T next) {
return windowStartTime == timestampWrapper.getStartTime()
|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aacd4f29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
index 0b7b51f..bd81c20 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
@@ -72,7 +72,7 @@ public class StreamDiscretizerTest {
};
TriggerPolicy<Integer> trigger = new TimeTriggerPolicy<Integer>(2L,
- new TimestampWrapper<Integer>(myTimeStamp, 1), 2L);
+ new TimestampWrapper<Integer>(myTimeStamp, 3));
EvictionPolicy<Integer> eviction = new TimeEvictionPolicy<Integer>(4L,
new TimestampWrapper<Integer>(myTimeStamp, 1));
[2/5] flink git commit: [FLINK-1619] [FLINK-1620] Basic sliding
prereducers added for Time and Count
Posted by gy...@apache.org.
[FLINK-1619] [FLINK-1620] Basic sliding prereducers added for Time and Count
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70abc16d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70abc16d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70abc16d
Branch: refs/heads/master
Commit: 70abc16df78d47c62b7cc7f1545542b330562113
Parents: 2be00ac
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 4 18:22:13 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedDataStream.java | 51 +++-
.../windowing/policy/CountEvictionPolicy.java | 8 +
.../windowing/policy/CountTriggerPolicy.java | 4 +
.../windowing/policy/TimeEvictionPolicy.java | 8 +
.../api/windowing/policy/TimeTriggerPolicy.java | 4 +
.../windowbuffer/BasicWindowBuffer.java | 4 -
.../windowbuffer/SlidingCountPreReducer.java | 83 ++++++
.../windowbuffer/SlidingPreReducer.java | 141 ++++++++++
.../windowbuffer/SlidingTimePreReducer.java | 99 +++++++
.../windowbuffer/TumblingGroupedPreReducer.java | 7 -
.../windowbuffer/TumblingPreReducer.java | 7 -
.../windowing/windowbuffer/WindowBuffer.java | 2 -
.../windowing/WindowIntegrationTest.java | 37 ++-
.../windowbuffer/BasicWindowBufferTest.java | 4 -
.../SlidingCountPreReducerTest.java | 216 ++++++++++++++++
.../windowbuffer/SlidingTimePreReducerTest.java | 257 +++++++++++++++++++
.../TumblingGroupedPreReducerTest.java | 7 -
.../windowbuffer/TumblingPreReducerTest.java | 6 -
18 files changed, 894 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 742ff22..4c7da88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -46,15 +46,21 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
import org.apache.flink.streaming.api.windowing.WindowUtils;
import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
@@ -365,16 +371,41 @@ public class WindowedDataStream<OUT> {
@SuppressWarnings("unchecked")
private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
- if (transformation == WindowTransformation.REDUCEWINDOW
- && getEviction() instanceof TumblingEvictionPolicy) {
- if (groupByKey == null) {
- return new TumblingPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
- .createSerializer(getExecutionConfig()));
- } else {
- return new TumblingGroupedPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
- .createSerializer(getExecutionConfig()));
+ if (transformation == WindowTransformation.REDUCEWINDOW) {
+ if (getTrigger() instanceof TumblingEvictionPolicy) {
+ if (groupByKey == null) {
+ return new TumblingPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
+ .createSerializer(getExecutionConfig()));
+ } else {
+ return new TumblingGroupedPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey,
+ getType().createSerializer(getExecutionConfig()));
+ }
+ } else if (getTrigger() instanceof CountTriggerPolicy
+ && getEviction() instanceof CountEvictionPolicy && groupByKey == null) {
+
+ int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+ int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
+ int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
+ if (slide < window) {
+ return new SlidingCountPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()), window,
+ slide, start);
+ }
+ } else if (getTrigger() instanceof TimeTriggerPolicy
+ && getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
+ int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
+ int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
+ TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
+ .getTimeStampWrapper();
+ if (slide < window) {
+ return new SlidingTimePreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()), window,
+ slide, wrapper);
+ }
}
}
return new BasicWindowBuffer<OUT>();
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 3ede27b..07478c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -136,6 +136,14 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
}
}
+ public int getWindowSize() {
+ return maxElements;
+ }
+
+ public int getStart() {
+ return startValue;
+ }
+
@Override
public String toString() {
return "CountPolicy(" + maxElements + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index 6d8149a..d439f72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -102,6 +102,10 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
}
}
+ public int getSlideSize() {
+ return max;
+ }
+
@Override
public String toString() {
return "CountPolicy(" + max + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index 982b6d5..ae17e29 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -150,10 +150,18 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
}
}
+ public long getWindowSize() {
+ return granularity;
+ }
+
@Override
public String toString() {
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+ ")";
}
+ public TimestampWrapper<DATA> getTimeStampWrapper() {
+ return timestampWrapper;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 7065582..ce8f16e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -213,6 +213,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
}
}
+ public long getSlideSize() {
+ return granularity;
+ }
+
@Override
public String toString() {
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 458de41..8e39398 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -63,10 +63,6 @@ public class BasicWindowBuffer<T> implements WindowBuffer<T> {
}
}
- public int size() {
- return buffer.size();
- }
-
@Override
public BasicWindowBuffer<T> clone() {
return new BasicWindowBuffer<T>();
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
new file mode 100644
index 0000000..ff358cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int windowSize;
+ private int slideSize;
+ private int start;
+
+ public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ int windowSize, int slideSize, int start) {
+ super(reducer, serializer);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ this.start = start;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ index = index - start;
+ }
+
+ @Override
+ public SlidingCountPreReducer<T> clone() {
+ return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ if (index >= 0) {
+ super.store(element);
+ } else {
+ index++;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return currentReduced.toString();
+ }
+
+ @Override
+ protected boolean addCurrentToReduce(T next) {
+ if (index <= slideSize) {
+ return true;
+ } else {
+ return index == windowSize;
+ }
+ }
+
+ @Override
+ protected void updateIndexAtEmit() {
+ if (index >= slideSize) {
+ index = index - slideSize;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
new file mode 100644
index 0000000..a21bb61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+
+ private static final long serialVersionUID = 1L;
+
+ protected ReduceFunction<T> reducer;
+
+ protected T currentReduced;
+ protected LinkedList<T> reduced = new LinkedList<T>();
+ protected LinkedList<Integer> elementsPerPreAggregate = new LinkedList<Integer>();
+
+ protected TypeSerializer<T> serializer;
+
+ protected int index = 0;
+ protected int toRemove = 0;
+
+ protected int elementsSinceLastPreAggregate = 0;
+
+ public SlidingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
+ this.reducer = reducer;
+ this.serializer = serializer;
+ }
+
+ public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+ StreamWindow<T> currentWindow = new StreamWindow<T>();
+ T finalAggregate = getFinalAggregate();
+ if (finalAggregate != null) {
+ currentWindow.add(finalAggregate);
+ collector.collect(currentWindow);
+ updateIndexAtEmit();
+ return true;
+ } else {
+ updateIndexAtEmit();
+ return false;
+ }
+
+ }
+
+ protected abstract void updateIndexAtEmit();
+
+ public T getFinalAggregate() {
+ try {
+ if (!reduced.isEmpty()) {
+ T finalReduce = reduced.get(0);
+ for (int i = 1; i < reduced.size(); i++) {
+ finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
+
+ }
+ if (currentReduced != null) {
+ finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
+ }
+ return finalReduce;
+ } else {
+ return currentReduced;
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void store(T element) throws Exception {
+ addToBufferIfEligible(element);
+ index++;
+ }
+
+ protected void addToBufferIfEligible(T element) throws Exception {
+ if (addCurrentToReduce(element) && currentReduced != null) {
+ reduced.add(currentReduced);
+ elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
+ currentReduced = element;
+ elementsSinceLastPreAggregate = 1;
+ } else {
+ if (currentReduced == null) {
+ currentReduced = element;
+ } else {
+ currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
+ }
+ elementsSinceLastPreAggregate++;
+ }
+ }
+
+ protected abstract boolean addCurrentToReduce(T next);
+
+ public void evict(int n) {
+ toRemove += n;
+
+ Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+ while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+ toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+ reduced.removeFirst();
+ lastPreAggregateSize = elementsPerPreAggregate.peek();
+ }
+
+ if (lastPreAggregateSize == null) {
+ toRemove = 0;
+ }
+ }
+
+ public int max(int a, int b) {
+ if (a > b) {
+ return a;
+ } else {
+ return b;
+ }
+ }
+
+ @Override
+ public abstract SlidingPreReducer<T> clone();
+
+ @Override
+ public String toString() {
+ return currentReduced.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
new file mode 100644
index 0000000..af2239c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int windowSize;
+ private int slideSize;
+ private TimestampWrapper<T> timestampWrapper;
+ private T lastStored;
+ protected long windowStartTime;
+
+ public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ int windowSize, int slideSize, TimestampWrapper<T> timestampWrapper) {
+ super(reducer, serializer);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ this.timestampWrapper = timestampWrapper;
+ this.windowStartTime = timestampWrapper.getStartTime();
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ super.store(element);
+ lastStored = element;
+ }
+
+ @Override
+ public SlidingTimePreReducer<T> clone() {
+ return new SlidingTimePreReducer<T>(reducer, serializer, windowSize, slideSize,
+ timestampWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return currentReduced.toString();
+ }
+
+ @Override
+ protected void updateIndexAtEmit() {
+ index = 0;
+ long lastTime = timestampWrapper.getTimestamp(lastStored);
+ if (lastTime - windowStartTime >= slideSize) {
+ windowStartTime = windowStartTime + slideSize;
+ }
+ }
+
+ @Override
+ public void evict(int n) {
+ toRemove += n;
+ Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+ while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+ toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+ reduced.removeFirst();
+ lastPreAggregateSize = elementsPerPreAggregate.peek();
+ }
+
+ if (toRemove > 0 && lastPreAggregateSize == null) {
+ currentReduced = null;
+ toRemove = 0;
+ }
+ }
+
+ @Override
+ protected boolean addCurrentToReduce(T next) {
+ return windowStartTime == timestampWrapper.getStartTime()
+ || timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 7403ffe..9431a99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -38,7 +38,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
private Map<Object, T> reducedValues;
- private int numOfElements = 0;
private TypeSerializer<T> serializer;
public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
@@ -56,7 +55,6 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
currentWindow.addAll(reducedValues.values());
collector.collect(currentWindow);
reducedValues.clear();
- numOfElements = 0;
return true;
} else {
return false;
@@ -76,16 +74,11 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
}
reducedValues.put(key, reduced);
- numOfElements++;
}
public void evict(int n) {
}
- public int size() {
- return numOfElements;
- }
-
@Override
public TumblingGroupedPreReducer<T> clone() {
return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index 35cf60e..58b30a6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -32,7 +32,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
private ReduceFunction<T> reducer;
private T reduced;
- private int numOfElements = 0;
private TypeSerializer<T> serializer;
public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
@@ -46,7 +45,6 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
currentWindow.add(reduced);
collector.collect(currentWindow);
reduced = null;
- numOfElements = 0;
return true;
} else {
return false;
@@ -59,16 +57,11 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
} else {
reduced = reducer.reduce(serializer.copy(reduced), element);
}
- numOfElements++;
}
public void evict(int n) {
}
- public int size() {
- return numOfElements;
- }
-
@Override
public TumblingPreReducer<T> clone() {
return new TumblingPreReducer<T>(reducer, serializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index e0429ab..2dd50db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -34,8 +34,6 @@ public interface WindowBuffer<T> extends Serializable, Cloneable {
public boolean emitWindow(Collector<StreamWindow<T>> collector);
- public int size();
-
public WindowBuffer<T> clone();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 25b5f87..bcfa188 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,6 +89,8 @@ public class WindowIntegrationTest implements Serializable {
inputs.add(11);
inputs.add(11);
+ KeySelector<Integer, ?> key = new ModKey(2);
+
Timestamp<Integer> ts = new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@@ -103,14 +105,12 @@ public class WindowIntegrationTest implements Serializable {
DataStream<Integer> source = env.fromCollection(inputs);
- source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+ source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new CentralSink1());
source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
.flatten().addSink(new CentralSink2());
- KeySelector<Integer, ?> key = new ModKey(2);
-
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink1());
@@ -126,12 +126,17 @@ public class WindowIntegrationTest implements Serializable {
source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4());
+ source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+ .addSink(new DistributedSink5());
+
env.execute();
- // sum ( Time of 2 slide 3 )
+ // sum ( Time of 3 slide 2 )
List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
expected1.add(StreamWindow.fromElements(5));
+ expected1.add(StreamWindow.fromElements(11));
expected1.add(StreamWindow.fromElements(9));
+ expected1.add(StreamWindow.fromElements(10));
expected1.add(StreamWindow.fromElements(32));
validateOutput(expected1, CentralSink1.windows);
@@ -193,6 +198,13 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected7, DistributedSink4.windows);
+ List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
+ expected8.add(StreamWindow.fromElements(12));
+ expected8.add(StreamWindow.fromElements(9));
+ expected8.add(StreamWindow.fromElements(32));
+
+ validateOutput(expected8, DistributedSink5.windows);
+
}
public static <R> void validateOutput(List<R> expected, List<R> actual) {
@@ -317,4 +329,21 @@ public class WindowIntegrationTest implements Serializable {
}
}
+
+ @SuppressWarnings("serial")
+ private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index bb756c7..967c719 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -41,8 +41,6 @@ public class BasicWindowBufferTest {
wb.store(2);
wb.store(10);
- assertEquals(2, wb.size());
-
wb.emitWindow(collector);
assertEquals(1, collected.size());
@@ -51,8 +49,6 @@ public class BasicWindowBufferTest {
wb.store(4);
wb.evict(2);
- assertEquals(1, wb.size());
-
wb.emitWindow(collector);
assertEquals(2, collected.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
new file mode 100644
index 0000000..3ce65f1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountPreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 3, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(9));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(27));
+ expected.add(StreamWindow.fromElements(33));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 5, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(10));
+ expected.add(StreamWindow.fromElements(20));
+ expected.add(StreamWindow.fromElements(30));
+ expected.add(StreamWindow.fromElements(40));
+ expected.add(StreamWindow.fromElements(50));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 6, 3, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(6));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(39));
+ expected.add(StreamWindow.fromElements(57));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
+ serializer, 5, 1, 2);
+
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.emitWindow(collector);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(7);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1));
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(6));
+ expected.add(StreamWindow.fromElements(10));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(20));
+ expected.add(StreamWindow.fromElements(25));
+ expected.add(StreamWindow.fromElements(30));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
new file mode 100644
index 0000000..bc3b13b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimePreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(9));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(27));
+ expected.add(StreamWindow.fromElements(33));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(10));
+ expected.add(StreamWindow.fromElements(20));
+ expected.add(StreamWindow.fromElements(30));
+ expected.add(StreamWindow.fromElements(40));
+ expected.add(StreamWindow.fromElements(50));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(6));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(39));
+ expected.add(StreamWindow.fromElements(57));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
+ serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(14);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.store(21);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+
+ preReducer.store(9);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(3));
+ expected.add(StreamWindow.fromElements(9));
+ expected.add(StreamWindow.fromElements(15));
+ expected.add(StreamWindow.fromElements(21));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(21));
+
+ assertEquals(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index 437fcd6..95aace0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -67,9 +67,6 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
-
- assertEquals(2, wb.size());
-
wb.emitWindow(collector);
assertEquals(1, collected.size());
@@ -77,9 +74,6 @@ public class TumblingGroupedPreReducerTest {
assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
- // Test automatic eviction
- assertEquals(0, wb.size());
-
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
@@ -89,7 +83,6 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(3)));
- assertEquals(4, wb.size());
wb.emitWindow(collector);
assertEquals(2, collected.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/70abc16d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index 31be227..ddaf747 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -58,17 +58,12 @@ public class TumblingPreReducerTest {
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
- assertEquals(2, wb.size());
-
wb.emitWindow(collector);
assertEquals(1, collected.size());
assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
collected.get(0));
- // Test automatic eviction
- assertEquals(0, wb.size());
-
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
@@ -78,7 +73,6 @@ public class TumblingPreReducerTest {
wb.store(serializer.copy(inputs.get(3)));
- assertEquals(4, wb.size());
wb.emitWindow(collector);
assertEquals(2, collected.size());
[3/5] flink git commit: [FLINK-1660] [streaming] Increased timeout
for MultiTriggerPolicyTest and introduced a constant representing it.
Posted by gy...@apache.org.
[FLINK-1660] [streaming] Increased timeout for MultiTriggerPolicyTest and introduced a constant representing it.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f09c0af2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f09c0af2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f09c0af2
Branch: refs/heads/master
Commit: f09c0af2f9a2f7844ccef0983699782c594588ca
Parents: 2bba2b3
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Tue Mar 10 09:20:04 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100
----------------------------------------------------------------------
.../api/windowing/policy/MultiTriggerPolicyTest.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f09c0af2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
index de6cdcb..9964cd8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
@@ -31,6 +31,15 @@ import org.junit.Test;
public class MultiTriggerPolicyTest {
/**
+ * This constant defines the timeout for the test of the start ups of the
+ * active trigger policy Threads.
+ */
+ private static final int TIMEOUT = 120000;
+
+ // Use this to increase the timeout to be as long as possible.
+ // private static final int TIMEOUT=Integer.MAX_VALUE;
+
+ /**
* This test covers all regular notify call. It takes no fake elements into
* account.
*/
@@ -138,8 +147,8 @@ public class MultiTriggerPolicyTest {
Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
new Thread(runnable).start();
- assertTrue("Even after 10000ms not all active policy runnables were started.",
- cb.check(10000, 1, 2, 3));
+ assertTrue("Even after " + TIMEOUT + "ms not all active policy runnables were started.",
+ cb.check(TIMEOUT, 1, 2, 3));
}
private void arrayEqualityCheck(Object[] array1, Object[] array2) {
[5/5] flink git commit: [FLINK-1619] [FLINK-1620] Grouped sliding
prereducers added for Time and Count
Posted by gy...@apache.org.
[FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time and Count
Closes #465
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2522f028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2522f028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2522f028
Branch: refs/heads/master
Commit: 2522f028bde9af57ba52904855265e6a8519e724
Parents: aacd4f2
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 9 13:43:32 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Mar 11 01:22:10 2015 +0100
----------------------------------------------------------------------
.../api/datastream/DataStreamSource.java | 3 +
.../api/datastream/WindowedDataStream.java | 50 ++--
.../streaming/api/windowing/WindowUtils.java | 7 +-
.../SlidingCountGroupedPreReducer.java | 86 ++++++
.../windowbuffer/SlidingGroupedPreReducer.java | 148 ++++++++++
.../windowbuffer/SlidingPreReducer.java | 31 +-
.../SlidingTimeGroupedPreReducer.java | 100 +++++++
.../windowing/WindowIntegrationTest.java | 155 +++++-----
.../SlidingCountGroupedPreReducerTest.java | 220 +++++++++++++++
.../SlidingTimeGroupedPreReducerTest.java | 280 +++++++++++++++++++
10 files changed, 979 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index b596cbd..0dda976 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -35,6 +35,9 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable, boolean isParallel) {
super(environment, operatorType, outTypeInfo, invokable);
this.isParallel = isParallel;
+ if (!isParallel) {
+ setParallelism(1);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 6ce9f9f..8199b22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -56,7 +56,9 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
+import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
@@ -337,8 +339,10 @@ public class WindowedDataStream<OUT> {
}
private int getDiscretizerParallelism() {
- return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
- || (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+ return isLocal
+ || WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
+ dataStream.getParallelism()) || (discretizerKey != null) ? dataStream.environment
+ .getDegreeOfParallelism() : 1;
}
private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
@@ -381,21 +385,35 @@ public class WindowedDataStream<OUT> {
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
.createSerializer(getExecutionConfig()));
}
- } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {
-
- return new SlidingCountPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- ((CountTriggerPolicy<?>) trigger).getStart());
-
- } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
+ } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
+ if (groupByKey == null) {
+ return new SlidingCountPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()),
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ ((CountTriggerPolicy<?>) trigger).getStart());
+ } else {
+ return new SlidingCountGroupedPreReducer<OUT>(
+ clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ .getType().createSerializer(getExecutionConfig()), groupByKey,
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ ((CountTriggerPolicy<?>) trigger).getStart());
+ }
- return new SlidingTimePreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- WindowUtils.getTimeStampWrapper(trigger));
+ } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
+ if (groupByKey == null) {
+ return new SlidingTimePreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ .createSerializer(getExecutionConfig()),
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ WindowUtils.getTimeStampWrapper(trigger));
+ } else {
+ return new SlidingTimeGroupedPreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ .createSerializer(getExecutionConfig()), groupByKey,
+ WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
+ WindowUtils.getTimeStampWrapper(trigger));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 246aff2..0649b4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -44,9 +44,10 @@ public class WindowUtils {
}
}
- public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
- || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+ public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
+ int inputParallelism) {
+ return inputParallelism != 1
+ && ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
}
public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
new file mode 100644
index 0000000..48bf1b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private long windowSize;
+ private long slideSize;
+ private int start;
+
+ protected long index = 0;
+
+ public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
+ super(reducer, serializer, key);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ this.start = start;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ index = index - start;
+ }
+
+ @Override
+ protected void afterStore() {
+ index++;
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ if (index >= 0) {
+ super.store(element);
+ } else {
+ index++;
+ }
+ }
+
+ @Override
+ protected boolean currentEligible(T next) {
+ if (index <= slideSize) {
+ return true;
+ } else {
+ return index == windowSize;
+ }
+ }
+
+ @Override
+ protected void afterEmit() {
+ if (index >= slideSize) {
+ index = index - slideSize;
+ }
+ }
+
+ @Override
+ public SlidingCountGroupedPreReducer<T> clone() {
+ return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
+ slideSize, start);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
new file mode 100644
index 0000000..aa1d76c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
+ protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();
+
+ protected KeySelector<T, ?> key;
+
+ public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ KeySelector<T, ?> key) {
+ super(reducer, serializer);
+ this.key = key;
+ }
+
+ public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
+ Map<Object, T> finalReduce = null;
+
+ if (!reducedMap.isEmpty()) {
+ finalReduce = reducedMap.get(0);
+ for (int i = 1; i < reducedMap.size(); i++) {
+ finalReduce = reduceMaps(finalReduce, reducedMap.get(i));
+
+ }
+ if (currentReducedMap != null) {
+ finalReduce = reduceMaps(finalReduce, currentReducedMap);
+ }
+
+ } else {
+ finalReduce = currentReducedMap;
+ }
+
+ if (finalReduce != null) {
+ currentWindow.addAll(finalReduce.values());
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+ private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {
+
+ Map<Object, T> reduced = new HashMap<Object, T>();
+
+ // Get the common keys in the maps
+ Set<Object> interSection = new HashSet<Object>();
+ Set<Object> diffFirst = new HashSet<Object>();
+ Set<Object> diffSecond = new HashSet<Object>();
+
+ for (Object key : first.keySet()) {
+ if (second.containsKey(key)) {
+ interSection.add(key);
+ } else {
+ diffFirst.add(key);
+ }
+ }
+
+ for (Object key : second.keySet()) {
+ if (!interSection.contains(key)) {
+ diffSecond.add(key);
+ }
+ }
+
+ // Reduce the common keys
+ for (Object key : interSection) {
+ reduced.put(
+ key,
+ reducer.reduce(serializer.copy(first.get(key)),
+ serializer.copy(second.get(key))));
+ }
+
+ for (Object key : diffFirst) {
+ reduced.put(key, first.get(key));
+ }
+
+ for (Object key : diffSecond) {
+ reduced.put(key, second.get(key));
+ }
+
+ return reduced;
+ }
+
+ protected void updateCurrent(T element) throws Exception {
+ if (currentReducedMap == null) {
+ currentReducedMap = new HashMap<Object, T>();
+ currentReducedMap.put(key.getKey(element), element);
+ } else {
+ Object nextKey = key.getKey(element);
+ T last = currentReducedMap.get(nextKey);
+ if (last == null) {
+ currentReducedMap.put(nextKey, element);
+ } else {
+ currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
+ }
+ }
+ }
+
+ @Override
+ protected void removeLastReduced() {
+ reducedMap.removeFirst();
+ }
+
+ @Override
+ protected void addCurrentToBuffer(T element) throws Exception {
+ reducedMap.add(currentReducedMap);
+ }
+
+ @Override
+ protected void resetCurrent() {
+ currentReducedMap = null;
+ }
+
+ @Override
+ protected boolean currentNotEmpty() {
+ return currentReducedMap != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 1dd126d..27f7ff5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -103,15 +103,23 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
}
protected void addToBufferIfEligible(T element) throws Exception {
- if (currentEligible(element) && currentReduced != null) {
+ if (currentEligible(element) && currentNotEmpty()) {
addCurrentToBuffer(element);
elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
- elementsSinceLastPreAggregate = 1;
- } else {
- updateCurrent(element);
-
- elementsSinceLastPreAggregate++;
+ elementsSinceLastPreAggregate = 0;
+ resetCurrent();
}
+ updateCurrent(element);
+
+ elementsSinceLastPreAggregate++;
+ }
+
+ protected void resetCurrent() {
+ currentReduced = null;
+ }
+
+ protected boolean currentNotEmpty() {
+ return currentReduced != null;
}
protected void updateCurrent(T element) throws Exception {
@@ -122,9 +130,8 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
}
}
- protected void addCurrentToBuffer(T element) {
+ protected void addCurrentToBuffer(T element) throws Exception {
reduced.add(currentReduced);
- currentReduced = element;
}
protected abstract boolean currentEligible(T next);
@@ -135,7 +142,7 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
- reduced.removeFirst();
+ removeLastReduced();
lastPreAggregateSize = elementsPerPreAggregate.peek();
}
@@ -144,7 +151,11 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
}
}
- public int max(int a, int b) {
+ protected void removeLastReduced() {
+ reduced.removeFirst();
+ }
+
+ public static int max(int a, int b) {
if (a > b) {
return a;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
new file mode 100644
index 0000000..1c293af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
+public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private long windowSize;
+ private long slideSize;
+ private TimestampWrapper<T> timestampWrapper;
+ private T lastStored;
+ protected long windowStartTime;
+
+ public SlidingTimeGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
+ KeySelector<T, ?> key, long windowSize, long slideSize,
+ TimestampWrapper<T> timestampWrapper) {
+ super(reducer, serializer, key);
+ if (windowSize > slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ } else {
+ throw new RuntimeException(
+ "Window size needs to be larger than slide size for the sliding pre-reducer");
+ }
+ this.timestampWrapper = timestampWrapper;
+ this.windowStartTime = timestampWrapper.getStartTime();
+ }
+
+ @Override
+ public void store(T element) throws Exception {
+ super.store(element);
+ lastStored = element;
+ }
+
+ @Override
+ public SlidingTimeGroupedPreReducer<T> clone() {
+ return new SlidingTimeGroupedPreReducer<T>(reducer, serializer, key, windowSize, slideSize,
+ timestampWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return currentReducedMap.toString();
+ }
+
+ @Override
+ protected void afterEmit() {
+ long lastTime = timestampWrapper.getTimestamp(lastStored);
+ if (lastTime - windowStartTime >= slideSize) {
+ windowStartTime = windowStartTime + slideSize;
+ }
+ }
+
+ @Override
+ public void evict(int n) {
+ toRemove += n;
+ Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
+
+ while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
+ toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
+ removeLastReduced();
+ lastPreAggregateSize = elementsPerPreAggregate.peek();
+ }
+
+ if (toRemove > 0 && lastPreAggregateSize == null) {
+ resetCurrent();
+ toRemove = 0;
+ }
+ }
+
+ @Override
+ protected boolean currentEligible(T next) {
+ return windowStartTime == timestampWrapper.getStartTime()
+ || timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index bcfa188..d7338a0 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -44,7 +44,7 @@ public class WindowIntegrationTest implements Serializable {
private static final Integer MEMORYSIZE = 32;
@SuppressWarnings("serial")
- private static class ModKey implements KeySelector<Integer, Integer> {
+ public static class ModKey implements KeySelector<Integer, Integer> {
private int m;
public ModKey(int m) {
@@ -126,82 +126,93 @@ public class WindowIntegrationTest implements Serializable {
source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4());
- source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new DistributedSink5());
+ source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+ .getDiscretizedStream().addSink(new DistributedSink5());
env.execute();
- // sum ( Time of 3 slide 2 )
- List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(5));
- expected1.add(StreamWindow.fromElements(11));
- expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(10));
- expected1.add(StreamWindow.fromElements(32));
-
- validateOutput(expected1, CentralSink1.windows);
-
- // Tumbling Time of 4 grouped by mod 2
- List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2, 4));
- expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(5));
- expected2.add(StreamWindow.fromElements(10));
- expected2.add(StreamWindow.fromElements(11, 11));
-
- validateOutput(expected2, CentralSink2.windows);
-
- // groupby mod 2 sum ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
- expected3.add(StreamWindow.fromElements(4));
- expected3.add(StreamWindow.fromElements(5));
- expected3.add(StreamWindow.fromElements(22));
- expected3.add(StreamWindow.fromElements(8));
- expected3.add(StreamWindow.fromElements(10));
-
- validateOutput(expected3, DistributedSink1.windows);
-
- // groupby mod3 Tumbling Count of 2 grouped by mod 2
- List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
- expected4.add(StreamWindow.fromElements(2, 2));
- expected4.add(StreamWindow.fromElements(1));
- expected4.add(StreamWindow.fromElements(4));
- expected4.add(StreamWindow.fromElements(5, 11));
- expected4.add(StreamWindow.fromElements(10));
- expected4.add(StreamWindow.fromElements(11));
- expected4.add(StreamWindow.fromElements(3));
-
- validateOutput(expected4, DistributedSink2.windows);
-
- // min ( Time of 2 slide 3 )
- List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(1));
- expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(10));
-
- validateOutput(expected5, CentralSink3.windows);
-
- // groupby mod 2 max ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
- expected6.add(StreamWindow.fromElements(3));
- expected6.add(StreamWindow.fromElements(5));
- expected6.add(StreamWindow.fromElements(11));
- expected6.add(StreamWindow.fromElements(4));
- expected6.add(StreamWindow.fromElements(10));
-
- validateOutput(expected6, DistributedSink3.windows);
-
- List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
- expected7.add(StreamWindow.fromElements(10));
- expected7.add(StreamWindow.fromElements(10, 11, 11));
-
- validateOutput(expected7, DistributedSink4.windows);
+ // sum ( Time of 3 slide 2 )
+ List<StreamWindow<Integer>> expected1 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected1.add(StreamWindow.fromElements(5));
+ expected1.add(StreamWindow.fromElements(11));
+ expected1.add(StreamWindow.fromElements(9));
+ expected1.add(StreamWindow.fromElements(10));
+ expected1.add(StreamWindow.fromElements(32));
+
+ validateOutput(expected1, CentralSink1.windows);
+
+ // Tumbling Time of 4 grouped by mod 2
+ List<StreamWindow<Integer>> expected2 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected2.add(StreamWindow.fromElements(2, 2, 4));
+ expected2.add(StreamWindow.fromElements(1, 3));
+ expected2.add(StreamWindow.fromElements(5));
+ expected2.add(StreamWindow.fromElements(10));
+ expected2.add(StreamWindow.fromElements(11, 11));
+
+ validateOutput(expected2, CentralSink2.windows);
+
+ // groupby mod 2 sum ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected3 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected3.add(StreamWindow.fromElements(4));
+ expected3.add(StreamWindow.fromElements(5));
+ expected3.add(StreamWindow.fromElements(22));
+ expected3.add(StreamWindow.fromElements(8));
+ expected3.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected3, DistributedSink1.windows);
+
+ // groupby mod3 Tumbling Count of 2 grouped by mod 2
+ List<StreamWindow<Integer>> expected4 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected4.add(StreamWindow.fromElements(2, 2));
+ expected4.add(StreamWindow.fromElements(1));
+ expected4.add(StreamWindow.fromElements(4));
+ expected4.add(StreamWindow.fromElements(5, 11));
+ expected4.add(StreamWindow.fromElements(10));
+ expected4.add(StreamWindow.fromElements(11));
+ expected4.add(StreamWindow.fromElements(3));
+
+ validateOutput(expected4, DistributedSink2.windows);
+
+ // min ( Time of 2 slide 3 )
+ List<StreamWindow<Integer>> expected5 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected5.add(StreamWindow.fromElements(1));
+ expected5.add(StreamWindow.fromElements(4));
+ expected5.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected5, CentralSink3.windows);
+
+ // groupby mod 2 max ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected6 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected6.add(StreamWindow.fromElements(3));
+ expected6.add(StreamWindow.fromElements(5));
+ expected6.add(StreamWindow.fromElements(11));
+ expected6.add(StreamWindow.fromElements(4));
+ expected6.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected6, DistributedSink3.windows);
+
+ List<StreamWindow<Integer>> expected7 = new
+ ArrayList<StreamWindow<Integer>>();
+ expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+ expected7.add(StreamWindow.fromElements(10));
+ expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+ validateOutput(expected7, DistributedSink4.windows);
List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
- expected8.add(StreamWindow.fromElements(12));
- expected8.add(StreamWindow.fromElements(9));
- expected8.add(StreamWindow.fromElements(32));
+ expected8.add(StreamWindow.fromElements(4, 8));
+ expected8.add(StreamWindow.fromElements(4, 5));
+ expected8.add(StreamWindow.fromElements(10, 22));
+
+ for (List<Integer> sw : DistributedSink5.windows) {
+ Collections.sort(sw);
+ }
validateOutput(expected8, DistributedSink5.windows);
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
new file mode 100644
index 0000000..4e63f89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingCountGroupedPreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 3, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(3, 6));
+ expected.add(StreamWindow.fromElements(5, 10));
+ expected.add(StreamWindow.fromElements(7, 14));
+ expected.add(StreamWindow.fromElements(9, 18));
+ expected.add(StreamWindow.fromElements(11, 22));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 5, 2, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(4, 6));
+ expected.add(StreamWindow.fromElements(12, 8));
+ expected.add(StreamWindow.fromElements(18, 12));
+ expected.add(StreamWindow.fromElements(24, 16));
+ expected.add(StreamWindow.fromElements(30, 20));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 6, 3, 0);
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(2, 4));
+ expected.add(StreamWindow.fromElements(9, 12));
+ expected.add(StreamWindow.fromElements(21, 18));
+ expected.add(StreamWindow.fromElements(30, 27));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
+ reducer, serializer, key, 5, 1, 2);
+
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.evict(1);
+ preReducer.store(1);
+ preReducer.emitWindow(collector);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(7);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1));
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(4, 2));
+ expected.add(StreamWindow.fromElements(4, 6));
+ expected.add(StreamWindow.fromElements(9, 6));
+ expected.add(StreamWindow.fromElements(8, 12));
+ expected.add(StreamWindow.fromElements(15, 10));
+ expected.add(StreamWindow.fromElements(12, 18));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2522f028/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
new file mode 100644
index 0000000..83ad7ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class SlidingTimeGroupedPreReducerTest {
+
+ TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
+
+ ReduceFunction<Integer> reducer = new SumReducer();
+
+ KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+
+ @Test
+ public void testPreReduce1() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(3, 6));
+ expected.add(StreamWindow.fromElements(5, 10));
+ expected.add(StreamWindow.fromElements(7, 14));
+ expected.add(StreamWindow.fromElements(9, 18));
+ expected.add(StreamWindow.fromElements(11, 22));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ protected static void checkResults(List<StreamWindow<Integer>> expected,
+ List<StreamWindow<Integer>> actual) {
+
+ for (StreamWindow<Integer> sw : expected) {
+ Collections.sort(sw);
+ }
+
+ for (StreamWindow<Integer> sw : actual) {
+ Collections.sort(sw);
+ }
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testPreReduce2() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.emitWindow(collector);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(9);
+ preReducer.store(10);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(4, 6));
+ expected.add(StreamWindow.fromElements(12, 8));
+ expected.add(StreamWindow.fromElements(18, 12));
+ expected.add(StreamWindow.fromElements(24, 16));
+ expected.add(StreamWindow.fromElements(30, 20));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce3() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.store(3);
+ preReducer.emitWindow(collector);
+ preReducer.store(4);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.store(9);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(10);
+ preReducer.store(11);
+ preReducer.store(12);
+ preReducer.emitWindow(collector);
+ preReducer.evict(3);
+ preReducer.store(13);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(2, 4));
+ expected.add(StreamWindow.fromElements(9, 12));
+ expected.add(StreamWindow.fromElements(21, 18));
+ expected.add(StreamWindow.fromElements(30, 27));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ @Test
+ public void testPreReduce4() throws Exception {
+ TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+
+ SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
+ reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
+ new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }, 1));
+
+ preReducer.store(1);
+ preReducer.store(2);
+ preReducer.emitWindow(collector);
+ preReducer.store(3);
+ preReducer.store(4);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(5);
+ preReducer.store(6);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(7);
+ preReducer.store(8);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(2);
+ preReducer.store(14);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+ preReducer.emitWindow(collector);
+ preReducer.store(21);
+ preReducer.emitWindow(collector);
+ preReducer.evict(1);
+ preReducer.emitWindow(collector);
+
+ preReducer.store(9);
+
+ List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+ expected.add(StreamWindow.fromElements(1, 2));
+ expected.add(StreamWindow.fromElements(3, 6));
+ expected.add(StreamWindow.fromElements(5, 10));
+ expected.add(StreamWindow.fromElements(7, 14));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(8));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(14));
+ expected.add(StreamWindow.fromElements(21));
+
+ checkResults(expected, collector.getCollected());
+ }
+
+ private static class SumReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+}
[4/5] flink git commit: [FLINK-1657] [streaming] Count window
parallel discretization
Posted by gy...@apache.org.
[FLINK-1657] [streaming] Count window parallel discretization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2be00ac7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2be00ac7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2be00ac7
Branch: refs/heads/master
Commit: 2be00ac7aca75c2fb9b66a554873b59970bfa21c
Parents: f09c0af
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Mar 7 20:07:46 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100
----------------------------------------------------------------------
.../api/datastream/DiscretizedStream.java | 2 +
.../api/datastream/WindowedDataStream.java | 70 ++++++--------------
.../windowing/WindowBufferInvokable.java | 3 +
.../streaming/api/windowing/WindowUtils.java | 70 ++++++++++++++++++++
.../windowing/WindowIntegrationTest.java | 54 +++++++--------
5 files changed, 124 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index dc2e987..451acf0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -35,6 +35,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartiti
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
/**
* A {@link DiscretizedStream} represents a data stream that has been divided
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 12ef0e6..742ff22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -44,6 +43,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBufferI
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.windowing.WindowEvent;
+import org.apache.flink.streaming.api.windowing.WindowUtils;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -71,16 +72,6 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
*/
public class WindowedDataStream<OUT> {
- protected enum WindowTransformation {
- REDUCEWINDOW, MAPWINDOW, NONE;
- private Function UDF;
-
- public WindowTransformation with(Function UDF) {
- this.UDF = UDF;
- return this;
- }
- }
-
protected DataStream<OUT> dataStream;
protected boolean isLocal = false;
@@ -266,8 +257,7 @@ public class WindowedDataStream<OUT> {
WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
.with(reduceFunction);
- WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, getTrigger(),
- getEviction(), discretizerKey);
+ WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
@@ -324,11 +314,9 @@ public class WindowedDataStream<OUT> {
private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
WindowBuffer<OUT> windowBuffer) {
- StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer(getTrigger(),
- getEviction(), discretizerKey);
+ StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
- StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(
- windowBuffer, discretizerKey);
+ StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(windowBuffer);
@SuppressWarnings({ "unchecked", "rawtypes" })
TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
@@ -346,30 +334,27 @@ public class WindowedDataStream<OUT> {
}
private int getDiscretizerParallelism() {
- return isLocal || (discretizerKey != null) ? dataStream.environment
- .getDegreeOfParallelism() : 1;
+ return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
+ || (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
}
- private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer(TriggerPolicy<OUT> trigger,
- EvictionPolicy<OUT> eviction, KeySelector<OUT, ?> discretizerKey) {
-
+ private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
if (discretizerKey == null) {
- return new StreamDiscretizer<OUT>(trigger, eviction);
- } else if (trigger instanceof TimeTriggerPolicy
- && ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) {
+ return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
+ } else if (WindowUtils.isSystemTimeTrigger(getTrigger())) {
return new GroupedTimeDiscretizer<OUT>(discretizerKey,
- (TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
- }
-
- else {
+ (TimeTriggerPolicy<OUT>) getTrigger(),
+ (CloneableEvictionPolicy<OUT>) getEviction());
+ } else {
return new GroupedStreamDiscretizer<OUT>(discretizerKey,
- (CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
+ (CloneableTriggerPolicy<OUT>) getTrigger(),
+ (CloneableEvictionPolicy<OUT>) getEviction());
}
}
private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(
- WindowBuffer<OUT> windowBuffer, KeySelector<OUT, ?> discretizerKey) {
+ WindowBuffer<OUT> windowBuffer) {
if (discretizerKey == null) {
return new WindowBufferInvokable<OUT>(windowBuffer);
} else {
@@ -378,20 +363,18 @@ public class WindowedDataStream<OUT> {
}
@SuppressWarnings("unchecked")
- private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation,
- TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction,
- KeySelector<OUT, ?> discretizerKey) {
+ private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
if (transformation == WindowTransformation.REDUCEWINDOW
- && eviction instanceof TumblingEvictionPolicy) {
+ && getEviction() instanceof TumblingEvictionPolicy) {
if (groupByKey == null) {
return new TumblingPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.UDF), getType()
+ clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
.createSerializer(getExecutionConfig()));
} else {
return new TumblingGroupedPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.UDF), groupByKey,
- getType().createSerializer(getExecutionConfig()));
+ clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
+ .createSerializer(getExecutionConfig()));
}
}
return new BasicWindowBuffer<OUT>();
@@ -678,15 +661,4 @@ public class WindowedDataStream<OUT> {
protected WindowedDataStream<OUT> copy() {
return new WindowedDataStream<OUT>(this);
}
-
- protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(StreamWindow<R> value) throws Exception {
- return value.windowID;
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 75f7d9d..72be823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -52,10 +52,13 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
throws Exception {
if (windowEvent.isElement()) {
+ System.out.println("element: " + windowEvent.getElement());
buffer.store(windowEvent.getElement());
} else if (windowEvent.isEviction()) {
+ System.out.println("eviction: " + windowEvent.getEviction());
buffer.evict(windowEvent.getEviction());
} else if (windowEvent.isTrigger()) {
+ System.out.println("trigger");
buffer.emitWindow(collector);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
new file mode 100644
index 0000000..1e3ba86
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+
+public class WindowUtils {
+
+ public enum WindowTransformation {
+ REDUCEWINDOW, MAPWINDOW, NONE;
+ private Function UDF;
+
+ public WindowTransformation with(Function UDF) {
+ this.UDF = UDF;
+ return this;
+ }
+
+ public Function getUDF() {
+ return UDF;
+ }
+ }
+
+ public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
+ || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+ }
+
+ public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
+ }
+
+ public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
+ return trigger instanceof TimeTriggerPolicy
+ && ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
+ }
+
+ public static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(StreamWindow<R> value) throws Exception {
+ return value.windowID;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 2ed0002..25b5f87 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,17 +89,6 @@ public class WindowIntegrationTest implements Serializable {
inputs.add(11);
inputs.add(11);
- StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-
- DataStream<Integer> source = env.fromCollection(inputs);
-
- source.window(Count.of(2)).every(Count.of(3)).sum(0).getDiscretizedStream()
- .addSink(new CentralSink1());
-
- source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
- .flatten().addSink(new CentralSink2());
-
- KeySelector<Integer, ?> key = new ModKey(2);
Timestamp<Integer> ts = new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@@ -110,38 +99,50 @@ public class WindowIntegrationTest implements Serializable {
}
};
+ StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+
+ DataStream<Integer> source = env.fromCollection(inputs);
+
+ source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+ .addSink(new CentralSink1());
+
+ source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
+ .flatten().addSink(new CentralSink2());
+
+ KeySelector<Integer, ?> key = new ModKey(2);
+
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink1());
source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
- source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream()
+ source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
.addSink(new CentralSink3());
source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
.addSink(new DistributedSink3());
- source.window(Count.of(5)).mapWindow(new IdentityWindowMap()).flatten()
+ source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4());
env.execute();
- // sum ( Count of 2 slide 3 )
+ // sum ( Time of 2 slide 3 )
List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(4));
+ expected1.add(StreamWindow.fromElements(5));
expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(22));
+ expected1.add(StreamWindow.fromElements(32));
validateOutput(expected1, CentralSink1.windows);
- // Tumbling Count of 4 grouped by mod 2
+ // Tumbling Time of 4 grouped by mod 2
List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2));
+ expected2.add(StreamWindow.fromElements(2, 2, 4));
expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(4, 10));
- expected2.add(StreamWindow.fromElements(5, 11));
- expected2.add(StreamWindow.fromElements(11));
+ expected2.add(StreamWindow.fromElements(5));
+ expected2.add(StreamWindow.fromElements(10));
+ expected2.add(StreamWindow.fromElements(11, 11));
validateOutput(expected2, CentralSink2.windows);
@@ -167,11 +168,11 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected4, DistributedSink2.windows);
- // min ( Count of 2 slide 3 )
+ // min ( Time of 2 slide 3 )
List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(2));
+ expected5.add(StreamWindow.fromElements(1));
expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(11));
+ expected5.add(StreamWindow.fromElements(10));
validateOutput(expected5, CentralSink3.windows);
@@ -186,8 +187,9 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected6, DistributedSink3.windows);
List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
- expected7.add(StreamWindow.fromElements(5, 10, 11, 11));
+ expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+ expected7.add(StreamWindow.fromElements(10));
+ expected7.add(StreamWindow.fromElements(10, 11, 11));
validateOutput(expected7, DistributedSink4.windows);