You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:58 UTC

[25/28] git commit: [streaming] Added customizable timestamps to window operations

[streaming] Added customizable timestamps to window operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0e8bf025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0e8bf025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0e8bf025

Branch: refs/heads/master
Commit: 0e8bf0252dc98c99727857cde2a4ee507a9d3228
Parents: 1850211
Author: ghermann <re...@gmail.com>
Authored: Thu Aug 28 22:19:58 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  33 +++++-
 .../api/datastream/GroupedDataStream.java       |  37 ++++++-
 .../operator/BatchReduceInvokable.java          |   2 +-
 .../operator/WindowGroupReduceInvokable.java    |   5 +-
 .../operator/WindowReduceInvokable.java         |  66 ++++++++++--
 .../api/invokable/util/DefaultTimestamp.java    |  34 +++++++
 .../streaming/api/invokable/util/Timestamp.java |  38 +++++++
 .../streaming/state/SlidingWindowState.java     |   6 +-
 .../operator/WindowReduceInvokableTest.java     | 100 +++++++++++++++++++
 9 files changed, 306 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3da939c..a2994dc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -50,6 +50,8 @@ import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.partitioner.FieldsPartitioner;
@@ -414,6 +416,35 @@ public abstract class DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
 			long windowSize, long slideInterval) {
+		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+	}
+
+	/**
+	 * Applies a reduce transformation on preset "time" chunks of the
+	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
+	 * records received during the predefined time window. The window is shifted
+	 * after each reduce call. Each GroupReduceFunction call can return any
+	 * number of elements including none. The time is determined by a
+	 * user-defined timestamp. The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each time window.
+	 * @param windowSize
+	 *            SingleOutputStreamOperator The time window to run the reducer
+	 *            on, in milliseconds.
+	 * @param slideInterval
+	 *            The time interval, batch is slid by.
+	 * @param timestamp
+	 *            Timestamp function to retrieve a timestamp from an element.
+	 * @param <R>
+	 *            output type
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
 		}
@@ -427,7 +458,7 @@ public abstract class DataStream<OUT> {
 				GroupReduceFunction.class, 1);
 
 		return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
-				new WindowReduceInvokable<OUT, R>(reducer, windowSize, slideInterval));
+				new WindowReduceInvokable<OUT, R>(reducer, windowSize, slideInterval, timestamp));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index bc2ac38..4d0265a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 /**
@@ -132,7 +134,7 @@ public class GroupedDataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
 			long windowSize) {
-		return windowReduce(reducer, windowSize, windowSize, windowSize);
+		return windowReduce(reducer, windowSize, windowSize);
 	}
 
 	/**
@@ -150,16 +152,43 @@ public class GroupedDataStream<OUT> {
 	 * @param windowSize
 	 *            SingleOutputStreamOperator The time window to run the reducer
 	 *            on, in milliseconds.
-	 * @param slideSize
+	 * @param slideInterval
+	 *            The time interval the batch is slid by.
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize, long slideInterval) {
+		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+	}
+	
+	/**
+	 * Applies a group reduce transformation on preset "time" chunks of the
+	 * grouped data stream in a sliding window fashion. The
+	 * {@link GroupReduceFunction} will receive input values based on the key
+	 * value. Only input values with the same key will go to the same reducer.
+	 * When the reducer has ran for all the values in the batch, the window is
+	 * shifted forward. The time is determined by a
+	 * user-defined timestamp. The user can also extend {@link RichGroupReduceFunction}
+	 * to gain access to other features provided by the {@link RichFuntion}
+	 * interface.
+	 *
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each time window.
+	 * @param windowSize
+	 *            SingleOutputStreamOperator The time window to run the reducer
+	 *            on, in milliseconds.
+	 * @param slideInterval
 	 *            The time interval the batch is slid by.
+	 * @param timestamp
+	 *            Timestamp function to retrieve a timestamp from an element.
 	 * @return The transformed DataStream.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize, long slideInterval, long timeUnitInMillis) {
+			long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
 		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
 				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
 				GroupReduceFunction.class, 1), new WindowGroupReduceInvokable<OUT, R>(reducer,
-				windowSize, slideInterval, keyPosition));
+				windowSize, slideInterval, keyPosition, timestamp));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 693b1c8..acd9f62 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -78,7 +78,7 @@ public class BatchReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, O
 		}
 	}
 
-	private void collectOneUnit() throws IOException {
+	protected void collectOneUnit() throws IOException {
 		ArrayList<StreamRecord<IN>> list;
 		list = new ArrayList<StreamRecord<IN>>(listSize);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index dbba50e..8b658f3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.MutableTableState;
 
@@ -33,8 +34,8 @@ public class WindowGroupReduceInvokable<IN, OUT> extends WindowReduceInvokable<I
 	private MutableTableState<Object, List<IN>> values;
 
 	public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
-			long slideInterval, int keyPosition) {
-		super(reduceFunction, windowSize, slideInterval);
+			long slideInterval, int keyPosition, Timestamp<IN> timestamp) {
+		super(reduceFunction, windowSize, slideInterval, timestamp);
 		this.keyPosition = keyPosition;
 		this.reducer = reduceFunction;
 		values = new MutableTableState<Object, List<IN>>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 48652c1..76d1768 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -17,25 +17,80 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.io.IOException;
+import java.util.ArrayList;
+
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class WindowReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private long startTime;
+	private long nextRecordTime;
+	private Timestamp<IN> timestamp;
 
 	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
-			long slideInterval) {
-		super(reduceFunction, windowSize,slideInterval);
+			long slideInterval, Timestamp<IN> timestamp) {
+		super(reduceFunction, windowSize, slideInterval);
+		this.timestamp = timestamp;
+	}
+
+	@Override
+	protected void immutableInvoke() throws Exception {
+		if ((reuse = recordIterator.next(reuse)) == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+
+		nextRecordTime = timestamp.getTimestamp(reuse.getObject()); // **
+		startTime = nextRecordTime - (nextRecordTime % granularity); // **
+
+		while (reuse != null && !state.isFull()) {
+			collectOneUnit();
+		}
+		reduce();
+
+		while (reuse != null) {
+			for (int i = 0; i < slideSize / granularity; i++) {
+				if (reuse != null) {
+					collectOneUnit();
+				}
+			}
+			reduce();
+		}
+	}
+
+	@Override
+	protected void collectOneUnit() throws IOException {
+		ArrayList<StreamRecord<IN>> list;
+		if (nextRecordTime > startTime + granularity - 1) {
+			list = new ArrayList<StreamRecord<IN>>();
+			startTime += granularity;
+		} else {
+			list = new ArrayList<StreamRecord<IN>>(listSize);
+
+			list.add(reuse);
+			resetReuse();
+
+			while ((reuse = recordIterator.next(reuse)) != null && batchNotFull()) {
+				list.add(reuse);
+				resetReuse();
+			}
+		}
+		state.pushBack(list);
+//		System.out.println(list);
+//		System.out.println(startTime + " - " + (startTime + granularity - 1) + " ("
+//				+ nextRecordTime + ")");
 	}
 
 	@Override
 	protected boolean batchNotFull() {
-		long time = System.currentTimeMillis();
-		if (time - startTime < granularity) {
+		nextRecordTime = timestamp.getTimestamp(reuse.getObject());
+		if (nextRecordTime < startTime + granularity) {
 			return true;
 		} else {
-			startTime = time;
+			startTime += granularity;
 			return false;
 		}
 	}
@@ -43,7 +98,6 @@ public class WindowReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN, OUT
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
-		startTime = System.currentTimeMillis();
 	}
 
 	protected void mutableInvoke() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
new file mode 100644
index 0000000..8276a01
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+/**
+ * Default timestamp function that uses the Java System.currentTimeMillis()
+ * method to retrieve a timestamp.
+ *
+ * @param <T>
+ *            Type of the inputs of the reducing function.
+ */
+public class DefaultTimestamp<T> implements Timestamp<T> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public long getTimestamp(T value) {
+		return System.currentTimeMillis();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
new file mode 100644
index 0000000..91758e8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ *            Type of the value to create the timestamp from.
+ */
+public interface Timestamp<T> extends Serializable {
+
+	/**
+	 * Values
+	 * @param value
+	 * The value to create the timestamp from
+	 * @return The timestamp
+	 */
+	public long getTimestamp(T value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
index 7852403..60dece2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
@@ -80,5 +80,9 @@ public class SlidingWindowState<T> {
 		}
 		return false;
 	}
-
+	
+	@Override
+	public String toString() {
+		return buffer.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0e8bf025/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
new file mode 100644
index 0000000..3dbabbb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WindowReduceInvokableTest {
+
+	public static final class MySlidingWindowReduce implements GroupReduceFunction<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
+			for (Integer value : values) {
+				out.collect(value.toString());
+			}
+			out.collect(EOW);
+		}
+	}
+
+	public static final class MyTimestamp implements Timestamp<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		private Iterator<Long> timestamps;
+
+		public MyTimestamp(List<Long> timestamps) {
+			this.timestamps = timestamps.iterator();
+		}
+
+		@Override
+		public long getTimestamp(Integer value) {
+			long ts = timestamps.next();
+			return ts;
+		}
+	}
+
+	private final static String EOW = "|";
+
+	private static List<WindowReduceInvokable<Integer, String>> invokables = new ArrayList<WindowReduceInvokable<Integer, String>>();
+	private static List<List<String>> expectedResults = new ArrayList<List<String>>();
+
+	@Before
+	public void before() {
+		long windowSize = 3;
+		long slideSize = 2;
+		List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L,
+				110L);
+		expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7",
+				EOW, "7", "8", "9", EOW, "8", "9", "10", EOW));
+		invokables.add(new WindowReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
+				windowSize, slideSize, new MyTimestamp(timestamps)));
+
+		windowSize = 10;
+		slideSize = 5;
+		timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
+		expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, EOW, "3", "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8", "9", EOW, "9", "10", EOW));
+		invokables.add(new WindowReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
+				windowSize, slideSize, new MyTimestamp(timestamps)));
+	}
+
+	@Test
+	public void slidingBatchReduceTest() {
+		List<List<String>> actualResults = new ArrayList<List<String>>();
+		
+		for (WindowReduceInvokable<Integer, String> invokable : invokables) {
+			actualResults.add(MockInvokable.createAndExecute(invokable,
+					Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
+		}
+
+		assertEquals(expectedResults, actualResults);
+	}
+
+}