You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:43 UTC

[10/12] git commit: [streaming] added batchReduceGroup, windowReduceGroup functionality to ConnectedDataStream and GroupedConnectedDataStream

[streaming] added batchReduceGroup, windowReduceGroup functionality to ConnectedDataStream and GroupedConnectedDataStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ad983377
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ad983377
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ad983377

Branch: refs/heads/master
Commit: ad983377dcc297a137f74a5895b8336e446ef864
Parents: 7337110
Author: szape <ne...@gmail.com>
Authored: Fri Sep 12 11:26:18 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     | 249 ++++++++++++++++-
 .../datastream/GroupedConnectedDataStream.java  | 268 ++++++++++++++++++-
 .../co/CoBatchGroupReduceInvokable.java         |   2 +-
 .../flink/streaming/state/CircularFifoList.java |  15 +-
 .../flink/streaming/state/StreamIterator.java   |   5 +-
 5 files changed, 511 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 0499c4c..256f470 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -27,13 +27,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoBatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
@@ -59,7 +65,7 @@ public class ConnectedDataStream<IN1, IN2> {
 		this.jobGraphBuilder = jobGraphBuilder;
 		this.environment = environment;
 		this.input1 = input1.copy();
-		this.input2 = input2.copy();		
+		this.input2 = input2.copy();
 	}
 
 	/**
@@ -82,20 +88,22 @@ public class ConnectedDataStream<IN1, IN2> {
 
 	/**
 	 * Gets the type of the first input
+	 * 
 	 * @return The type of the first input
 	 */
 	public TypeInformation<IN1> getInputType1() {
 		return input1.getOutputType();
 	}
-	
+
 	/**
 	 * Gets the type of the second input
+	 * 
 	 * @return The type of the second input
 	 */
 	public TypeInformation<IN2> getInputType2() {
 		return input2.getOutputType();
 	}
-	
+
 	/**
 	 * GroupBy operation for connected data stream. Groups the elements of
 	 * input1 and input2 according to keyPosition1 and keyPosition2. Used for
@@ -174,12 +182,15 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	/**
-	 * Applies a reduce transformation on both input of a
-	 * {@link ConnectedDataStream} and maps the output to a common type. The
-	 * transformation calls {@link CoReduceFunction#reduce1} and
-	 * {@link CoReduceFunction#map1} for each element of the first input and
-	 * {@link CoReduceFunction#reduce2} and {@link CoReduceFunction#map2} for
-	 * each element of the second input.
+	 * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
+	 * the outputs to a common type. The transformation calls
+	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+	 * each element of the first input and {@link CoReduceFunction#reduce2} and
+	 * {@link CoReduceFunction#map2} for each element of the second input. This
+	 * type of reduce is much faster than reduceGroup since the reduce function
+	 * can be applied incrementally. The user can also extend the
+	 * {@link RichCoReduceFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
 	 * 
 	 * @param coReducer
 	 *            The {@link CoReduceFunction} that will be called for every
@@ -199,6 +210,226 @@ public class ConnectedDataStream<IN1, IN2> {
 				new CoStreamReduceInvokable<IN1, IN2, OUT>(coReducer));
 	}
 
+	/**
+	 * Applies a reduceGroup transformation on the preset batches of the inputs
+	 * of a {@link ConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+	 * input. Each {@link CoGroupReduceFunction} call can return any number of
+	 * elements including none. When the reducer has ran for all the values of a
+	 * batch, the batch is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param batchSize1
+	 *            The number of elements in a batch of the first input.
+	 * @param batchSize2
+	 *            The number of elements in a batch of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2) {
+		return batchReduceGroup(coReducer, batchSize1, batchSize2, batchSize1, batchSize2);
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset batches of the inputs
+	 * of a {@link ConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+	 * input. Each {@link CoGroupReduceFunction} call can return any number of
+	 * elements including none. When the reducer has ran for all the values of a
+	 * batch, the batch is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param batchSize1
+	 *            The number of elements in a batch of the first input.
+	 * @param batchSize2
+	 *            The number of elements in a batch of the second input.
+	 * @param slideSize1
+	 *            The number of elements a batch of the first input is slid by.
+	 * @param slideSize2
+	 *            The number of elements a batch of the second input is slid by.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2,
+			long slideSize1, long slideSize2) {
+
+		if (batchSize1 < 1 || batchSize2 < 1) {
+			throw new IllegalArgumentException("Batch size must be positive");
+		}
+		if (slideSize1 < 1 || slideSize2 < 1) {
+			throw new IllegalArgumentException("Slide size must be positive");
+		}
+		if (batchSize1 < slideSize1 || batchSize2 < slideSize2) {
+			throw new IllegalArgumentException("Batch size must be at least slide size");
+		}
+
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+				CoGroupReduceFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+				CoGroupReduceFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+				CoGroupReduceFunction.class, 2);
+
+		return addCoFunction("coBatchReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoBatchGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+						batchSize1, batchSize2, slideSize1, slideSize2));
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link ConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. Each {@link CoGroupReduceFunction} call can return any number of
+	 * elements including none. When the reducer has ran for all the values of a
+	 * window, the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2) {
+		return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2);
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link ConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. Each {@link CoGroupReduceFunction} call can return any number of
+	 * elements including none. When the reducer has ran for all the values of a
+	 * window, the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @param slideInterval1
+	 *            The time interval a window of the first input is slid by.
+	 * @param slideInterval2
+	 *            The time interval a window of the second input is slid by.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+			long slideInterval1, long slideInterval2) {
+		return windowReduceGroup(coReducer, windowSize1, windowSize2, slideInterval1,
+				slideInterval2, new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link ConnectedDataStream}, where the time is provided by
+	 * timestamps. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. Each {@link CoGroupReduceFunction} call can return any number of
+	 * elements including none. When the reducer has ran for all the values of a
+	 * window, the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @param timestamp1
+	 *            The predefined timestamp function of the first input.
+	 * @param timestamp2
+	 *            The predefined timestamp function of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+		return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2,
+				timestamp1, timestamp2);
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link ConnectedDataStream}, where the time is provided by
+	 * timestamps. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. Each {@link CoGroupReduceFunction} call can return any number of
+	 * elements including none. When the reducer has ran for all the values of a
+	 * window, the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @param slideInterval1
+	 *            The time interval a window of the first input is slid by.
+	 * @param slideInterval2
+	 *            The time interval a window of the second input is slid by.
+	 * @param timestamp1
+	 *            The predefined timestamp function of the first input.
+	 * @param timestamp2
+	 *            The predefined timestamp function of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+			long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
+			TimeStamp<IN2> timestamp2) {
+
+		if (windowSize1 < 1 || windowSize2 < 1) {
+			throw new IllegalArgumentException("Window size must be positive");
+		}
+		if (slideInterval1 < 1 || slideInterval2 < 1) {
+			throw new IllegalArgumentException("Slide interval must be positive");
+		}
+		if (windowSize1 < slideInterval1 || windowSize2 < slideInterval2) {
+			throw new IllegalArgumentException("Window size must be at least slide interval");
+		}
+
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+				CoGroupReduceFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+				CoGroupReduceFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+				CoGroupReduceFunction.class, 2);
+
+		return addCoFunction("coWindowReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoWindowGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+						windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
+						timestamp2));
+	}
+
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
 			final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
 			TypeSerializerWrapper<IN2> in2TypeWrapper, TypeSerializerWrapper<OUT> outTypeWrapper,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
index 59dd2fa..0b6a7b5 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
@@ -19,8 +19,14 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
@@ -37,20 +43,24 @@ public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN
 	}
 
 	/**
-	 * Applies a CoReduce transformation on a {@link ConnectedDataStream}
-	 * grouped by the given key position and maps the output to a common type.
-	 * The {@link CoReduceFunction} will receive input values based on the key
-	 * positions. The transformation calls {@link CoReduceFunction#reduce1} and
-	 * {@link CoReduceFunction#map1} for each element of the first input and
-	 * {@link CoReduceFunction#reduce2} and {@link CoReduceFunction#map2} for
-	 * each element of the second input. For each input, only values with the
-	 * same key will go to the same reducer.
+	 * Applies a reduce transformation on a {@link GroupedConnectedDataStream},
+	 * and maps the outputs to a common type. The transformation calls
+	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+	 * each element of the first input and {@link CoReduceFunction#reduce2} and
+	 * {@link CoReduceFunction#map2} for each element of the second input. For
+	 * both inputs, the reducer is applied on every group of elements sharing
+	 * the same key at the respective position. This type of reduce is much
+	 * faster than reduceGroup since the reduce function can be applied
+	 * incrementally. The user can also extend the {@link RichCoReduceFunction}
+	 * to gain access to other features provided by the {@link RichFuntion}
+	 * interface.
 	 * 
 	 * @param coReducer
 	 *            The {@link CoReduceFunction} that will be called for every
-	 *            element with the same key of each input DataStream.
-	 * @return The transformed DataStream.
+	 *            element of the inputs.
+	 * @return The transformed {@link DataStream}.
 	 */
+	@Override
 	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 
 		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
@@ -64,4 +74,242 @@ public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN
 				new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
 	}
 
+	/**
+	 * Applies a reduceGroup transformation on the preset batches of the inputs
+	 * of a {@link GroupedConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+	 * input. For both inputs, the reducer is applied on every group of elements
+	 * of every batch sharing the same key at the respective position. Each
+	 * {@link CoGroupReduceFunction} call can return any number of elements
+	 * including none. When the reducer has ran for all the values of a batch,
+	 * the batch is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param batchSize1
+	 *            The number of elements in a batch of the first input.
+	 * @param batchSize2
+	 *            The number of elements in a batch of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Override
+	public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2) {
+		return batchReduceGroup(coReducer, batchSize1, batchSize2, batchSize1, batchSize2);
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset batches of the inputs
+	 * of a {@link GroupedConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+	 * input. For both inputs, the reducer is applied on every group of elements
+	 * of every batch sharing the same key at the respective position. Each
+	 * {@link CoGroupReduceFunction} call can return any number of elements
+	 * including none. When the reducer has ran for all the values of a batch,
+	 * the batch is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param batchSize1
+	 *            The number of elements in a batch of the first input.
+	 * @param batchSize2
+	 *            The number of elements in a batch of the second input.
+	 * @param slideSize1
+	 *            The number of elements a batch of the first input is slid by.
+	 * @param slideSize2
+	 *            The number of elements a batch of the second input is slid by.
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Override
+	public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2,
+			long slideSize1, long slideSize2) {
+
+		if (batchSize1 < 1 || batchSize2 < 1) {
+			throw new IllegalArgumentException("Batch size must be positive");
+		}
+		if (slideSize1 < 1 || slideSize2 < 1) {
+			throw new IllegalArgumentException("Slide size must be positive");
+		}
+		if (batchSize1 < slideSize1 || batchSize2 < slideSize2) {
+			throw new IllegalArgumentException("Batch size must be at least slide size");
+		}
+
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+				CoGroupReduceFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+				CoGroupReduceFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+				CoGroupReduceFunction.class, 2);
+
+		return addCoFunction("coBatchReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoGroupedBatchGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+						batchSize1, batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2));
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link GroupedConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. For both inputs, the reducer is applied on every group of elements
+	 * of every window sharing the same key at the respective position. Each
+	 * {@link CoGroupReduceFunction} call can return any number of elements
+	 * including none. When the reducer has ran for all the values of a window,
+	 * the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Override
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2) {
+		return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2);
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link GroupedConnectedDataStream}. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. For both inputs, the reducer is applied on every group of elements
+	 * of every window sharing the same key at the respective position. Each
+	 * {@link CoGroupReduceFunction} call can return any number of elements
+	 * including none. When the reducer has ran for all the values of a window,
+	 * the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @param slideInterval1
+	 *            The time interval a window of the first input is slid by.
+	 * @param slideInterval2
+	 *            The time interval a window of the second input is slid by.
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Override
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+			long slideInterval1, long slideInterval2) {
+		return windowReduceGroup(coReducer, windowSize1, windowSize2, slideInterval1,
+				slideInterval2, new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link GroupedConnectedDataStream}, where the time is
+	 * provided by timestamps. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. For both inputs, the reducer is applied on every group of elements
+	 * of every window sharing the same key at the respective position. Each
+	 * {@link CoGroupReduceFunction} call can return any number of elements
+	 * including none. When the reducer has ran for all the values of a window,
+	 * the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @param timestamp1
+	 *            The predefined timestamp function of the first input.
+	 * @param timestamp2
+	 *            The predefined timestamp function of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Override
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+		return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2,
+				timestamp1, timestamp2);
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the preset time windows of the
+	 * inputs of a {@link GroupedConnectedDataStream}, where the time is
+	 * provided by timestamps. The transformation calls
+	 * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+	 * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+	 * input. For both inputs, the reducer is applied on every group of elements
+	 * of every window sharing the same key at the respective position. Each
+	 * {@link CoGroupReduceFunction} call can return any number of elements
+	 * including none. When the reducer has ran for all the values of a window,
+	 * the window is slid forward. The user can also extend
+	 * {@link RichCoGroupReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coReducer
+	 *            The {@link CoGroupReduceFunction} that will be called for
+	 *            every batch of each input.
+	 * @param windowSize1
+	 *            The size of the time window of the first input.
+	 * @param windowSize2
+	 *            The size of the time window of the second input.
+	 * @param slideInterval1
+	 *            The time interval a window of the first input is slid by.
+	 * @param slideInterval2
+	 *            The time interval a window of the second input is slid by.
+	 * @param timestamp1
+	 *            The predefined timestamp function of the first input.
+	 * @param timestamp2
+	 *            The predefined timestamp function of the second input.
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Override
+	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+			long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
+			TimeStamp<IN2> timestamp2) {
+
+		if (windowSize1 < 1 || windowSize2 < 1) {
+			throw new IllegalArgumentException("Window size must be positive");
+		}
+		if (slideInterval1 < 1 || slideInterval2 < 1) {
+			throw new IllegalArgumentException("Slide interval must be positive");
+		}
+		if (windowSize1 < slideInterval1 || windowSize2 < slideInterval2) {
+			throw new IllegalArgumentException("Window size must be at least slide interval");
+		}
+
+		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+				CoGroupReduceFunction.class, 0);
+		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+				CoGroupReduceFunction.class, 1);
+		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+				CoGroupReduceFunction.class, 2);
+
+		return addCoFunction("coWindowReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoGroupedWindowGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+						windowSize1, windowSize2, slideInterval1, slideInterval2, keyPosition1,
+						keyPosition2, timestamp1, timestamp2));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
index 334caa7..07e3d6c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
@@ -21,8 +21,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
 
 public class CoBatchGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
-
 	private static final long serialVersionUID = 1L;
+	
 	protected long startCounter1;
 	protected long startCounter2;
 	protected long endCounter1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
index 1a6d678..23dcb57 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
@@ -17,8 +17,7 @@
 
 package org.apache.flink.streaming.state;
 
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -28,14 +27,16 @@ import java.util.Queue;
  * queue if full and a new element is added, the elements that belong to the
  * first sliding interval are removed.
  */
-public class CircularFifoList<T> {
+public class CircularFifoList<T> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
 	private Queue<T> queue;
-	private Deque<Long> slideSizes;
+	private Queue<Long> slideSizes;
 	private long counter;
 
 	public CircularFifoList() {
 		this.queue = new LinkedList<T>();
-		this.slideSizes = new ArrayDeque<Long>();
+		this.slideSizes = new LinkedList<Long>();
 		this.counter = 0;
 	}
 
@@ -50,10 +51,10 @@ public class CircularFifoList<T> {
 	}
 
 	public void shiftWindow() {
-		for (int i = 0; i < slideSizes.getFirst(); i++) {
+		Long firstSlideSize = slideSizes.remove();
+		for (int i = 0; i < firstSlideSize; i++) {
 			queue.remove();
 		}
-		slideSizes.remove();
 	}
 
 	public Iterator<T> getIterator() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
index 7a7a07a..ed1cd80 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.state;
 
+import java.io.Serializable;
 import java.util.Iterator;
 
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -25,7 +26,9 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * Simple wrapper class to convert an Iterator<StreamRecord<T>> to an
  * Iterator<T> iterator by invoking the getObject() method on every element.
  */
-public class StreamIterator<T> implements Iterator<T> {
+public class StreamIterator<T> implements Iterator<T>, Serializable {
+	private static final long serialVersionUID = 1L;
+
 	private Iterator<StreamRecord<T>> iterator = null;
 
 	public void load(Iterator<StreamRecord<T>> iterator) {