You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:19 UTC

[42/51] [abbrv] git commit: [streaming] Implemented sliding window and batchReduce

[streaming] Implemented sliding window and batchReduce


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e089959c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e089959c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e089959c

Branch: refs/heads/master
Commit: e089959c0e4bb3e08a95b1b6b9076935c87a5a02
Parents: 309727e
Author: ghermann <re...@gmail.com>
Authored: Thu Aug 7 20:33:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  41 +++++-
 .../operator/BatchReduceInvokable.java          | 135 ++++++++---------
 .../operator/WindowReduceInvokable.java         | 118 ++++++---------
 .../streaming/state/SlidingWindowState.java     |  42 ++++--
 .../state/SlidingWindowStateIterator.java       |  41 +++++-
 .../api/invokable/operator/BatchReduceTest.java | 143 +++++++++++++++----
 .../streaming/state/SlidingWindowStateTest.java |  77 ++++++++++
 7 files changed, 396 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bec55e0..531f43c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -326,7 +326,34 @@ public abstract class DataStream<OUT> {
 			int batchSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
 				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
-				batchSize));
+				batchSize, batchSize));
+	}
+
+	/**
+	 * Applies a reduce transformation on preset sliding chunks of the
+	 * DataStream. The transformation calls a {@link GroupReduceFunction} for
+	 * each tuple batch of the predefined size. The tuple batch gets slid by the
+	 * given number of tuples. Each GroupReduceFunction call can return any
+	 * number of elements including none. The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each tuple batch.
+	 * @param batchSize
+	 *            The number of tuples grouped together in the batch.
+	 * @param slideSize
+	 *            The number of tuples the batch is slid by.
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+			int batchSize, int slideSize) {
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
+				batchSize, slideSize));
 	}
 
 	/**
@@ -342,7 +369,8 @@ public abstract class DataStream<OUT> {
 	 * @param reducer
 	 *            The GroupReduceFunction that is called for each time window.
 	 * @param windowSize
-	 *            The time window to run the reducer on, in milliseconds.
+	 *            SingleOutputStreamOperator The time window to run the reducer
+	 *            on, in milliseconds.
 	 * @param <R>
 	 *            output type
 	 * @return The transformed DataStream.
@@ -351,7 +379,14 @@ public abstract class DataStream<OUT> {
 			long windowSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
 				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
-				windowSize));
+				windowSize, windowSize, windowSize));
+	}
+
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize, long slideInterval, long timeUnitInMillis) {
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
+				windowSize, slideInterval, timeUnitInMillis));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 4e0a7a5..ffd4f1a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -20,106 +20,87 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.ArrayList;
-import java.util.List;
 
+import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.state.SlidingWindowState;
 
 public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private int batchSize;
-
-	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
-		super(reduceFunction);
+	private int slideSize;
+	private int granularity;
+	private boolean emitted;
+	private transient SlidingWindowState<IN> state;
+
+	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize,
+			int slideSize) {
+		super(reduceFunction);		
 		this.reducer = reduceFunction;
 		this.batchSize = batchSize;
+		this.slideSize = slideSize;
+		this.granularity = MathUtils.gcd(batchSize, slideSize);
 	}
 
 	@Override
 	protected void immutableInvoke() throws Exception {
-		List<IN> tupleBatch = new ArrayList<IN>();
-		boolean batchStart;
-		int counter = 0;
-
-		while (loadNextRecord() != null) {
-			batchStart = true;
-			do {
-				if (batchStart) {
-					batchStart = false;
-				} else {
-					reuse = loadNextRecord();
-					if (reuse == null) {
-						break;
-					}
-				}
-				counter++;
-				tupleBatch.add(reuse.getObject());
-				resetReuse();
-			} while (counter < batchSize);
-			reducer.reduce(tupleBatch, collector);
-			tupleBatch.clear();
-			counter = 0;
-		}
-
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		userIterator = new CounterIterator();
-
-		do {
-			if (userIterator.hasNext()) {
-				reducer.reduce(userIterable, collector);
-				userIterator.reset();
+		reuse = loadNextRecord();
+		ArrayList<IN> list;
+
+		while (!state.isFull()) {
+			list = new ArrayList<IN>(granularity);
+			try {
+				state.pushBack(fillArray(list));
+			} catch (NullPointerException e) {
+				throw new RuntimeException("DataStream length must be greater than batchsize");
 			}
-		} while (reuse != null);
-	}
-
-	private class CounterIterator implements BatchIterator<IN> {
-		private int counter;
-		private boolean loadedNext;
-
-		public CounterIterator() {
-			counter = 1;
 		}
 
-		@Override
-		public boolean hasNext() {
-			if (counter > batchSize) {
-				return false;
-			} else if (!loadedNext) {
-				loadNextRecord();
-				loadedNext = true;
-			}
-			return (reuse != null);
-		}
+		boolean go = reduce();
 
-		@Override
-		public IN next() {
-			if (hasNext()) {
-				counter++;
-				loadedNext = false;
-				return reuse.getObject();
+		while (go) {
+			if (state.isEmittable()) {
+				go = reduce();
 			} else {
-				counter++;
-				loadedNext = false;
-				return null;
+				list = (ArrayList<IN>) state.popFront();
+				list.clear();
+				state.pushBack(fillArray(list));
+				emitted = false;
+				go = reuse != null;
 			}
 		}
-
-		public void reset() {
-			for (int i = 0; i < (batchSize - counter); i++) {
-				loadNextRecord();
-			}
-			loadNextRecord();
-			loadedNext = true;
-			counter = 1;
+		if (!emitted) {
+			reduce();
 		}
+	}
 
-		@Override
-		public void remove() {
+	private boolean reduce() throws Exception {
+		userIterator = state.getIterator();
+		reducer.reduce(userIterable, collector);
+		emitted = true;
+		return reuse != null;
+	}
 
-		}
+	private ArrayList<IN> fillArray(ArrayList<IN> list) {
+		int counter = 0;
+		do {
+			counter++;
+			list.add(reuse.getObject());
+			resetReuse();
+		} while ((reuse = loadNextRecord()) != null && counter < granularity);
+		return list;
+	}
 
+	@Override
+	protected void mutableInvoke() throws Exception {
+		throw new RuntimeException("Reducing mutable sliding batch is not supported.");
+	}
+	
+	@Override
+	public void open(Configuration parameters) throws Exception{
+		super.open(parameters);
+		this.state = new SlidingWindowState<IN>(batchSize, slideSize, granularity);
 	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 3405641..cbc242c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -20,105 +20,71 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.state.SlidingWindowState;
 
 public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private long windowSize;
+	private long slideInterval;
+	private long timeUnitInMillis;
+	private transient SlidingWindowState<IN> state;
 	volatile boolean isRunning;
-	boolean window;
 
-	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
+			long slideInterval, long timeUnitInMillis) {
 		super(reduceFunction);
-		this.reducer = reduceFunction;
 		this.windowSize = windowSize;
-		this.window = true;
+		this.slideInterval = slideInterval;
+		this.timeUnitInMillis = timeUnitInMillis;
 	}
 
 	protected void immutableInvoke() throws Exception {
-		List<IN> tupleBatch = new ArrayList<IN>();
-		boolean batchStart;
-
-		long startTime = System.currentTimeMillis();
-		while (loadNextRecord() != null) {
-			batchStart = true;
-			do {
-				if (batchStart) {
-					batchStart = false;
-				} else {
-					reuse = loadNextRecord();
-					if (reuse == null) {
-						break;
-					}
-				}
-				tupleBatch.add(reuse.getObject());
-				resetReuse();
-			} while (System.currentTimeMillis() - startTime < windowSize);
-			reducer.reduce(tupleBatch, collector);
-			tupleBatch.clear();
-			startTime = System.currentTimeMillis();
+		if ((reuse = loadNextRecord()) == null) {
+			throw new RuntimeException("DataStream must not be empty");
 		}
 
-	}
-
-	protected void mutableInvoke() throws Exception {
-		userIterator = new WindowIterator();
-
-		do {
-			if (userIterator.hasNext()) {
-				reducer.reduce(userIterable, collector);
-				userIterator.reset();
-			}
-		} while (reuse != null);
-	}
-
-	private class WindowIterator implements BatchIterator<IN> {
-
-		private boolean loadedNext;
-		private long startTime;
-
-		public WindowIterator() {
-			startTime = System.currentTimeMillis();
+		while (reuse != null && !state.isFull()) {
+			collectOneTimeUnit();
 		}
+		reduce();
 
-		@Override
-		public boolean hasNext() {
-			if (System.currentTimeMillis() - startTime > windowSize) {
-				return false;
-			} else if (!loadedNext) {
-				loadNextRecord();
-				loadedNext = true;
-			}
-			return (reuse != null);
-		}
-
-		@Override
-		public IN next() {
-			if (hasNext()) {
-				loadedNext = false;
-				return reuse.getObject();
-			} else {
-				loadedNext = false;
-				return reuse.getObject();
+		while (reuse != null) {
+			for (int i = 0; i < slideInterval / timeUnitInMillis; i++) {
+				collectOneTimeUnit();
 			}
+			reduce();
 		}
+	}
 
-		public void reset() {
-			while (System.currentTimeMillis() - startTime < windowSize) {
-				loadNextRecord();
-			}
-			loadNextRecord();
-			loadedNext = true;
-			startTime = System.currentTimeMillis();
-		}
+	private void collectOneTimeUnit() {
+		ArrayList<IN> list;
+		list = new ArrayList<IN>();
+		long startTime = System.currentTimeMillis();
 
-		@Override
-		public void remove() {
+		do {
+			list.add(reuse.getObject());
+			resetReuse();
+		} while ((reuse = loadNextRecord()) != null
+				&& System.currentTimeMillis() - startTime < timeUnitInMillis);
+		state.pushBack(list);
+	}
 
-		}
+	private boolean reduce() throws Exception {
+		userIterator = state.forceGetIterator();
+		reducer.reduce(userIterable, collector);
+		return reuse != null;
+	}
 
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.state = new SlidingWindowState<IN>(windowSize, slideInterval, timeUnitInMillis);
 	}
 
+	protected void mutableInvoke() throws Exception {
+		throw new RuntimeException("Reducing mutable sliding window is not supported.");
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
index 600b69f..a062ba9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
@@ -20,43 +20,43 @@
 package org.apache.flink.streaming.state;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.collections.buffer.CircularFifoBuffer;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
  * The window state for window operator. To be general enough, this class
  * implements a count based window operator. It is possible for the user to
  * compose time based window operator by extending this class by splitting the
  * stream into multiple mini batches.
  */
-public class SlidingWindowState<InTuple extends Tuple> implements Serializable {
+public class SlidingWindowState<T> implements Serializable {
 	private static final long serialVersionUID = -2376149970115888901L;
-	private int currentRecordCount;
+	private long currentRecordCount;
 	private int fullRecordCount;
 	private int slideRecordCount;
+	private SlidingWindowStateIterator<T> iterator;
 
-	CircularFifoBuffer buffer;
+	private CircularFifoBuffer buffer;
 
-	public SlidingWindowState(int windowSize, int slidingStep, int computeGranularity) {
+	public SlidingWindowState(long windowSize, long slideInterval, long timeUnitInMillis) {
 		this.currentRecordCount = 0;
 		// here we assume that windowSize and slidingStep is divisible by
-		// computeGranularity.
-		this.fullRecordCount = windowSize / computeGranularity;
-		this.slideRecordCount = slidingStep / computeGranularity;
+		// computationGranularity.
+		this.fullRecordCount = (int) (windowSize / timeUnitInMillis);
+		this.slideRecordCount = (int) (slideInterval / timeUnitInMillis);
 		this.buffer = new CircularFifoBuffer(fullRecordCount);
+		this.iterator = new SlidingWindowStateIterator<T>(buffer);
 	}
 
-	public void pushBack(ArrayList<InTuple> tupleArray) {
-		buffer.add(tupleArray);
+	public void pushBack(List<T> array) {
+		buffer.add(array);
 		currentRecordCount += 1;
 	}
 
 	@SuppressWarnings("unchecked")
-	public ArrayList<InTuple> popFront() {
-		ArrayList<InTuple> frontRecord = (ArrayList<InTuple>) buffer.get();
+	public List<T> popFront() {
+		List<T> frontRecord = (List<T>) buffer.get();
 		buffer.remove();
 		return frontRecord;
 	}
@@ -65,6 +65,20 @@ public class SlidingWindowState<InTuple extends Tuple> implements Serializable {
 		return currentRecordCount >= fullRecordCount;
 	}
 
+	public SlidingWindowStateIterator<T> getIterator() {
+		if (isFull()) {
+			iterator.reset();
+			return iterator;
+		} else {
+			return null;
+		}
+	}
+
+	public SlidingWindowStateIterator<T> forceGetIterator() {
+		iterator.reset();
+		return iterator;
+	}
+
 	public boolean isEmittable() {
 		if (currentRecordCount == fullRecordCount + slideRecordCount) {
 			currentRecordCount -= slideRecordCount;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
index 69c9a48..6033276 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
@@ -19,18 +19,45 @@
 
 package org.apache.flink.streaming.state;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import java.util.Collection;
+import java.util.Iterator;
 
-public class SlidingWindowStateIterator<K>{
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.flink.streaming.api.invokable.operator.BatchIterator;
+
+public class SlidingWindowStateIterator<T> implements BatchIterator<T> {
+
+	private CircularFifoBuffer buffer;
+	private Iterator<Collection<T>> iterator;
+	private Iterator<T> subIterator;
+	
+	public SlidingWindowStateIterator(CircularFifoBuffer buffer) {
+		this.buffer = buffer;
+	}
 
 	public boolean hasNext() {
-		return false;
+		return subIterator.hasNext();
 	}
 
-	public Tuple2<K, StreamRecord<Tuple>> next() {
-		return null;
+	public T next() {
+		T nextElement = subIterator.next();
+		if (!subIterator.hasNext()) {
+			if (iterator.hasNext()) {
+				subIterator = iterator.next().iterator();
+			}
+		}
+		return nextElement;
 	}
 
+	@Override
+	public void remove() {
+		throw new RuntimeException("Cannot use remove on reducing iterator.");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void reset() {
+		iterator = buffer.iterator();
+		subIterator = iterator.next().iterator();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index c91878b..bf7621e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -22,10 +22,14 @@ package org.apache.flink.streaming.api.invokable.operator;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -39,64 +43,155 @@ public class BatchReduceTest {
 
 	private static ArrayList<Double> avgs = new ArrayList<Double>();
 	private static final int BATCH_SIZE = 5;
-	private static final int PARALlELISM = 1;
+	private static final int PARALLELISM = 1;
 	private static final long MEMORYSIZE = 32;
 
-	public static final class MyBatchReduce implements
-			GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
+	public static final class MyBatchReduce implements GroupReduceFunction<Double, Double> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterable<Tuple1<Double>> values, Collector<Tuple1<Double>> out)
-				throws Exception {
+		public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception {
 
 			Double sum = 0.;
 			Double count = 0.;
-			for (Tuple1<Double> value : values) {
-				sum += value.f0;
+			for (Double value : values) {
+				sum += value;
 				count++;
 			}
 			if (count > 0) {
-				out.collect(new Tuple1<Double>(sum / count));
+				out.collect(new Double(sum / count));
 			}
 		}
 	}
 
-	public static final class MySink implements SinkFunction<Tuple1<Double>> {
+	public static final class MySink implements SinkFunction<Double> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Double> tuple) {
-			avgs.add(tuple.f0);
+		public void invoke(Double tuple) {
+			avgs.add(tuple);
 		}
 
 	}
 
-	public static final class MySource implements SourceFunction<Tuple1<Double>> {
+	public static final class MySource implements SourceFunction<Double> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<Double>> collector) {
+		public void invoke(Collector<Double> collector) {
 			for (Double i = 1.; i <= 100; i++) {
-				collector.collect(new Tuple1<Double>(i));
+				collector.collect(new Double(i));
 			}
 		}
 	}
 
-	@Test
-	public void test() throws Exception {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+	public static final class MySlidingBatchReduce implements RichFunction,
+			GroupReduceFunction<Long, String> {
+		private static final long serialVersionUID = 1L;
 
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALlELISM);
+		double startTime;
 
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Double>> dataStream = env.addSource(new MySource())
-				.batchReduce(new MyBatchReduce(), BATCH_SIZE).addSink(new MySink());
+		@Override
+		public void reduce(Iterable<Long> values, Collector<String> out) throws Exception {
+			for (Long value : values) {
+				out.collect(value.toString());
+			}
+			out.collect(END_OF_BATCH);
+		}
 
-		env.executeTest(MEMORYSIZE);
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			startTime = (double) System.currentTimeMillis() / 1000;
+		}
+
+		@Override
+		public void close() throws Exception {
+		}
+
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return null;
+		}
+
+		@Override
+		public void setRuntimeContext(RuntimeContext t) {
+			// TODO Auto-generated method stub
+
+		}
+	}
+
+	private static List<SortedSet<String>> sink = new ArrayList<SortedSet<String>>();
+	private static final String END_OF_BATCH = "end of batch";
+
+	public static final class MySlidingSink implements SinkFunction<String> {
 
+		private static final long serialVersionUID = 1L;
+
+		SortedSet<String> currentSet = new TreeSet<String>();
+
+		@Override
+		public void invoke(String string) {
+			if (string.equals(END_OF_BATCH)) {
+				sink.add(currentSet);
+				currentSet = new TreeSet<String>();
+			} else {
+				currentSet.add(string);
+			}
+		}
+	}
+
+	private final static int SLIDING_BATCH_SIZE = 9;
+	private final static int SLIDE_SIZE = 6;
+	private static final int SEQUENCE_SIZE = 30;
+	private LocalStreamEnvironment env;
+	
+	private void slidingStream() {
+		env.generateSequence(1, SEQUENCE_SIZE)
+		.batchReduce(new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE)
+		.addSink(new MySlidingSink());
+	}
+	
+	private void slidingTest() {
+		int firstInBatch = 1;
+
+		for (SortedSet<String> set : sink) {
+			int to = Math.min(firstInBatch + SLIDING_BATCH_SIZE - 1, SEQUENCE_SIZE);
+			assertEquals(getExpectedSet(to), set);
+			firstInBatch += SLIDE_SIZE;
+		}
+	}
+	
+	private void nonSlidingStream() {
+		env.addSource(new MySource()).batchReduce(new MyBatchReduce(), BATCH_SIZE)
+		.addSink(new MySink());
+	}
+	
+	private void nonSlidingTest() {
 		for (int i = 0; i < avgs.size(); i++) {
 			assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
 		}
 	}
+	
+	@Test
+	public void test() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
+		env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM);
+
+		slidingStream();
+		nonSlidingStream();
+		
+		env.executeTest(MEMORYSIZE);
+
+		slidingTest();
+		nonSlidingTest();
+	}
+
+	private SortedSet<String> getExpectedSet(int to) {
+		SortedSet<String> expectedSet = new TreeSet<String>();
+		for (int i = to; i > to - SLIDING_BATCH_SIZE; i--) {
+			expectedSet.add(Integer.toString(i));
+		}
+		return expectedSet;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java
new file mode 100644
index 0000000..15902f4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+public class SlidingWindowStateTest {
+
+	private final static int SLIDING_BATCH_SIZE = 3;
+	private final static int SLIDE_SIZE = 2;
+	private static final int UNIT = 1;
+
+	@Test
+	public void test() {
+		SlidingWindowState<Integer> state = new SlidingWindowState<Integer>(SLIDING_BATCH_SIZE,
+				SLIDE_SIZE, UNIT);
+		state.pushBack(Arrays.asList(0));
+		state.pushBack(Arrays.asList(1));
+		assertEquals(false, state.isFull());
+		state.pushBack(Arrays.asList(2));
+		assertTrue(state.isFull());
+
+		SlidingWindowStateIterator<Integer> iterator = state.getIterator();
+
+		SortedSet<Integer> actualSet = new TreeSet<Integer>();
+		while (iterator.hasNext()) {
+			actualSet.add(iterator.next());
+		}
+		assertEquals(getExpectedSet(0, 2), actualSet);
+		actualSet.clear();
+
+		state.pushBack(Arrays.asList(3));
+		assertEquals(false, state.isEmittable());
+		state.pushBack(Arrays.asList(4));
+		assertTrue(state.isEmittable());
+
+		iterator = state.getIterator();
+
+		while (iterator.hasNext()) {
+			actualSet.add(iterator.next());
+		}
+		assertEquals(getExpectedSet(2, 4), actualSet);
+	}
+
+	private SortedSet<Integer> getExpectedSet(int from, int to) {
+		SortedSet<Integer> expectedSet = new TreeSet<Integer>();
+		for (int i = from; i <= to; i++) {
+			expectedSet.add(i);
+		}
+		return expectedSet;
+	}
+
+}