You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:58 UTC
[25/28] git commit: [streaming] Added customizable timestamps to
window operations
[streaming] Added customizable timestamps to window operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0e8bf025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0e8bf025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0e8bf025
Branch: refs/heads/master
Commit: 0e8bf0252dc98c99727857cde2a4ee507a9d3228
Parents: 1850211
Author: ghermann <re...@gmail.com>
Authored: Thu Aug 28 22:19:58 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 33 +++++-
.../api/datastream/GroupedDataStream.java | 37 ++++++-
.../operator/BatchReduceInvokable.java | 2 +-
.../operator/WindowGroupReduceInvokable.java | 5 +-
.../operator/WindowReduceInvokable.java | 66 ++++++++++--
.../api/invokable/util/DefaultTimestamp.java | 34 +++++++
.../streaming/api/invokable/util/Timestamp.java | 38 +++++++
.../streaming/state/SlidingWindowState.java | 6 +-
.../operator/WindowReduceInvokableTest.java | 100 +++++++++++++++++++
9 files changed, 306 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3da939c..a2994dc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -50,6 +50,8 @@ import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
@@ -414,6 +416,35 @@ public abstract class DataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
long windowSize, long slideInterval) {
+ return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+ }
+
+ /**
+ * Applies a reduce transformation on preset "time" chunks of the
+ * DataStream. The transformation calls a {@link GroupReduceFunction} on
+ * records received during the predefined time window. The window is shifted
+ * after each reduce call. Each GroupReduceFunction call can return any
+ * number of elements including none. The time is determined by a
+ * user-defined timestamp. The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
+ *
+ *
+ * @param reducer
+ * The GroupReduceFunction that is called for each time window.
+ * @param windowSize
+ * SingleOutputStreamOperator The time window to run the reducer
+ * on, in milliseconds.
+ * @param slideInterval
+ * The time interval, batch is slid by.
+ * @param timestamp
+ * Timestamp function to retrieve a timestamp from an element.
+ * @param <R>
+ * output type
+ * @return The transformed DataStream.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+ long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
@@ -427,7 +458,7 @@ public abstract class DataStream<OUT> {
GroupReduceFunction.class, 1);
return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
- new WindowReduceInvokable<OUT, R>(reducer, windowSize, slideInterval));
+ new WindowReduceInvokable<OUT, R>(reducer, windowSize, slideInterval, timestamp));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index bc2ac38..4d0265a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
/**
@@ -132,7 +134,7 @@ public class GroupedDataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
long windowSize) {
- return windowReduce(reducer, windowSize, windowSize, windowSize);
+ return windowReduce(reducer, windowSize, windowSize);
}
/**
@@ -150,16 +152,43 @@ public class GroupedDataStream<OUT> {
* @param windowSize
* SingleOutputStreamOperator The time window to run the reducer
* on, in milliseconds.
- * @param slideSize
+ * @param slideInterval
+ * The time interval the batch is slid by.
+ * @return The transformed DataStream.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+ long windowSize, long slideInterval) {
+ return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+ }
+
+ /**
+ * Applies a group reduce transformation on preset "time" chunks of the
+ * grouped data stream in a sliding window fashion. The
+ * {@link GroupReduceFunction} will receive input values based on the key
+ * value. Only input values with the same key will go to the same reducer.
+ * When the reducer has ran for all the values in the batch, the window is
+ * shifted forward. The time is determined by a
+ * user-defined timestamp. The user can also extend {@link RichGroupReduceFunction}
+ * to gain access to other features provided by the {@link RichFuntion}
+ * interface.
+ *
+ * @param reducer
+ * The GroupReduceFunction that is called for each time window.
+ * @param windowSize
+ * SingleOutputStreamOperator The time window to run the reducer
+ * on, in milliseconds.
+ * @param slideInterval
* The time interval the batch is slid by.
+ * @param timestamp
+ * Timestamp function to retrieve a timestamp from an element.
* @return The transformed DataStream.
*/
public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
- long windowSize, long slideInterval, long timeUnitInMillis) {
+ long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
GroupReduceFunction.class, 1), new WindowGroupReduceInvokable<OUT, R>(reducer,
- windowSize, slideInterval, keyPosition));
+ windowSize, slideInterval, keyPosition, timestamp));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 693b1c8..acd9f62 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -78,7 +78,7 @@ public class BatchReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, O
}
}
- private void collectOneUnit() throws IOException {
+ protected void collectOneUnit() throws IOException {
ArrayList<StreamRecord<IN>> list;
list = new ArrayList<StreamRecord<IN>>(listSize);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index dbba50e..8b658f3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.MutableTableState;
@@ -33,8 +34,8 @@ public class WindowGroupReduceInvokable<IN, OUT> extends WindowReduceInvokable<I
private MutableTableState<Object, List<IN>> values;
public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
- long slideInterval, int keyPosition) {
- super(reduceFunction, windowSize, slideInterval);
+ long slideInterval, int keyPosition, Timestamp<IN> timestamp) {
+ super(reduceFunction, windowSize, slideInterval, timestamp);
this.keyPosition = keyPosition;
this.reducer = reduceFunction;
values = new MutableTableState<Object, List<IN>>();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 48652c1..76d1768 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -17,25 +17,80 @@
package org.apache.flink.streaming.api.invokable.operator;
+import java.io.IOException;
+import java.util.ArrayList;
+
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class WindowReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private long startTime;
+ private long nextRecordTime;
+ private Timestamp<IN> timestamp;
public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
- long slideInterval) {
- super(reduceFunction, windowSize,slideInterval);
+ long slideInterval, Timestamp<IN> timestamp) {
+ super(reduceFunction, windowSize, slideInterval);
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ protected void immutableInvoke() throws Exception {
+ if ((reuse = recordIterator.next(reuse)) == null) {
+ throw new RuntimeException("DataStream must not be empty");
+ }
+
+ nextRecordTime = timestamp.getTimestamp(reuse.getObject()); // **
+ startTime = nextRecordTime - (nextRecordTime % granularity); // **
+
+ while (reuse != null && !state.isFull()) {
+ collectOneUnit();
+ }
+ reduce();
+
+ while (reuse != null) {
+ for (int i = 0; i < slideSize / granularity; i++) {
+ if (reuse != null) {
+ collectOneUnit();
+ }
+ }
+ reduce();
+ }
+ }
+
+ @Override
+ protected void collectOneUnit() throws IOException {
+ ArrayList<StreamRecord<IN>> list;
+ if (nextRecordTime > startTime + granularity - 1) {
+ list = new ArrayList<StreamRecord<IN>>();
+ startTime += granularity;
+ } else {
+ list = new ArrayList<StreamRecord<IN>>(listSize);
+
+ list.add(reuse);
+ resetReuse();
+
+ while ((reuse = recordIterator.next(reuse)) != null && batchNotFull()) {
+ list.add(reuse);
+ resetReuse();
+ }
+ }
+ state.pushBack(list);
+// System.out.println(list);
+// System.out.println(startTime + " - " + (startTime + granularity - 1) + " ("
+// + nextRecordTime + ")");
}
@Override
protected boolean batchNotFull() {
- long time = System.currentTimeMillis();
- if (time - startTime < granularity) {
+ nextRecordTime = timestamp.getTimestamp(reuse.getObject());
+ if (nextRecordTime < startTime + granularity) {
return true;
} else {
- startTime = time;
+ startTime += granularity;
return false;
}
}
@@ -43,7 +98,6 @@ public class WindowReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN, OUT
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- startTime = System.currentTimeMillis();
}
protected void mutableInvoke() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
new file mode 100644
index 0000000..8276a01
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+/**
+ * Default timestamp function that uses the Java System.currentTimeMillis()
+ * method to retrieve a timestamp.
+ *
+ * @param <T>
+ * Type of the inputs of the reducing function.
+ */
+public class DefaultTimestamp<T> implements Timestamp<T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(T value) {
+ return System.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
new file mode 100644
index 0000000..91758e8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ * Type of the value to create the timestamp from.
+ */
+public interface Timestamp<T> extends Serializable {
+
+ /**
+ * Values
+ * @param value
+ * The value to create the timestamp from
+ * @return The timestamp
+ */
+ public long getTimestamp(T value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
index 7852403..60dece2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
@@ -80,5 +80,9 @@ public class SlidingWindowState<T> {
}
return false;
}
-
+
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
new file mode 100644
index 0000000..3dbabbb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WindowReduceInvokableTest {
+
+ public static final class MySlidingWindowReduce implements GroupReduceFunction<Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
+ for (Integer value : values) {
+ out.collect(value.toString());
+ }
+ out.collect(EOW);
+ }
+ }
+
+ public static final class MyTimestamp implements Timestamp<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ private Iterator<Long> timestamps;
+
+ public MyTimestamp(List<Long> timestamps) {
+ this.timestamps = timestamps.iterator();
+ }
+
+ @Override
+ public long getTimestamp(Integer value) {
+ long ts = timestamps.next();
+ return ts;
+ }
+ }
+
+ private final static String EOW = "|";
+
+ private static List<WindowReduceInvokable<Integer, String>> invokables = new ArrayList<WindowReduceInvokable<Integer, String>>();
+ private static List<List<String>> expectedResults = new ArrayList<List<String>>();
+
+ @Before
+ public void before() {
+ long windowSize = 3;
+ long slideSize = 2;
+ List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L,
+ 110L);
+ expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7",
+ EOW, "7", "8", "9", EOW, "8", "9", "10", EOW));
+ invokables.add(new WindowReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
+ windowSize, slideSize, new MyTimestamp(timestamps)));
+
+ windowSize = 10;
+ slideSize = 5;
+ timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
+ expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, EOW, "3", "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8", "9", EOW, "9", "10", EOW));
+ invokables.add(new WindowReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
+ windowSize, slideSize, new MyTimestamp(timestamps)));
+ }
+
+ @Test
+ public void slidingBatchReduceTest() {
+ List<List<String>> actualResults = new ArrayList<List<String>>();
+
+ for (WindowReduceInvokable<Integer, String> invokable : invokables) {
+ actualResults.add(MockInvokable.createAndExecute(invokable,
+ Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
+ }
+
+ assertEquals(expectedResults, actualResults);
+ }
+
+}