You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:59 UTC

[16/18] git commit: [streaming] Added CoBatchGroupReduceInvokable, CoWindowGroupReduceInvokable and grouped variants

[streaming] Added CoBatchGroupReduceInvokable, CoWindowGroupReduceInvokable and grouped variants


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b6ffdbad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b6ffdbad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b6ffdbad

Branch: refs/heads/master
Commit: b6ffdbad1e6779d8c02e1d0ac77ae6da4ad4a216
Parents: d0dd513
Author: szape <ne...@gmail.com>
Authored: Wed Sep 10 10:41:28 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |   4 +-
 .../datastream/GroupedConnectedDataStream.java  |   4 +-
 .../api/datastream/GroupedDataStream.java       |   6 +-
 .../operator/GroupReduceInvokable.java          |  56 ----
 .../operator/GroupedReduceInvokable.java        |  56 ++++
 .../co/CoBatchGroupReduceInvokable.java         | 117 +++++++++
 .../operator/co/CoGroupReduceInvokable.java     | 140 ++++++----
 .../co/CoGroupedBatchGroupReduceInvokable.java  |  99 +++++++
 .../operator/co/CoGroupedReduceInvokable.java   |  85 ++++++
 .../co/CoGroupedWindowGroupReduceInvokable.java | 102 ++++++++
 .../api/invokable/operator/co/CoInvokable.java  |  14 +-
 .../operator/co/CoReduceInvokable.java          |  70 -----
 .../operator/co/CoStreamReduceInvokable.java    |  70 +++++
 .../co/CoWindowGroupReduceInvokable.java        |  98 +++++++
 .../flink/streaming/state/CircularFifoList.java |  67 +++++
 .../flink/streaming/state/StreamIterator.java   |  55 ++++
 .../streaming/api/AggregationFunctionTest.java  |   8 +-
 .../operator/CoBatchGroupReduceTest.java        |  96 +++++++
 .../api/invokable/operator/CoFlatMapTest.java   |   4 +-
 .../invokable/operator/CoGroupReduceTest.java   |  96 -------
 .../operator/CoGroupedBatchGroupReduceTest.java | 116 +++++++++
 .../invokable/operator/CoGroupedReduceTest.java |  94 +++++++
 .../CoGroupedWindowGroupReduceTest.java         | 223 ++++++++++++++++
 .../api/invokable/operator/CoReduceTest.java    |  71 -----
 .../invokable/operator/CoStreamReduceTest.java  |  71 +++++
 .../operator/CoWindowGroupReduceTest.java       | 256 +++++++++++++++++++
 .../operator/GroupReduceInvokableTest.java      |  54 ----
 .../operator/GroupedReduceInvokableTest.java    |  54 ++++
 28 files changed, 1782 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index e00919f..9529dcd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 import org.apache.flink.types.TypeInformation;
@@ -196,7 +196,7 @@ public class ConnectedDataStream<IN1, IN2> {
 				CoReduceFunction.class, 2);
 
 		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
+				new CoStreamReduceInvokable<IN1, IN2, OUT>(coReducer));
 	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
index 8276561..8a16b98 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
@@ -61,7 +61,7 @@ public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN
 				CoReduceFunction.class, 2);
 
 		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
+				new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 138a6f8..30826d3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
@@ -69,7 +69,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
 		return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
 				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
+				ReduceFunction.class, 0), new GroupedReduceInvokable<OUT>(reducer, keyPosition));
 	}
 
 	/**
@@ -240,7 +240,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	@Override
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
-		GroupReduceInvokable<OUT> invokable = new GroupReduceInvokable<OUT>(aggregate, keyPosition);
+		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
 				null, null, invokable);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
deleted file mode 100755
index 73523d9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.state.MutableTableState;
-
-public class GroupReduceInvokable<IN> extends StreamReduceInvokable<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private int keyPosition;
-	private MutableTableState<Object, IN> values;
-	private IN reduced;
-
-	public GroupReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
-		super(reducer);
-		this.keyPosition = keyPosition;
-		values = new MutableTableState<Object, IN>();
-	}
-
-	@Override
-	protected void reduce() throws Exception {
-		Object key = reuse.getField(keyPosition);
-		currentValue = values.get(key);
-		nextValue = reuse.getObject();
-		if (currentValue != null) {
-			callUserFunctionAndLogException();
-			values.put(key, reduced);
-			collector.collect(reduced);
-		} else {
-			values.put(key, nextValue);
-			collector.collect(nextValue);
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		reduced = reducer.reduce(currentValue, nextValue);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
new file mode 100755
index 0000000..6423492
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private int keyPosition;
+	private MutableTableState<Object, IN> values;
+	private IN reduced;
+
+	public GroupedReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
+		super(reducer);
+		this.keyPosition = keyPosition;
+		values = new MutableTableState<Object, IN>();
+	}
+
+	@Override
+	protected void reduce() throws Exception {
+		Object key = reuse.getField(keyPosition);
+		currentValue = values.get(key);
+		nextValue = reuse.getObject();
+		if (currentValue != null) {
+			callUserFunctionAndLogException();
+			values.put(key, reduced);
+			collector.collect(reduced);
+		} else {
+			values.put(key, nextValue);
+			collector.collect(nextValue);
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		reduced = reducer.reduce(currentValue, nextValue);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
new file mode 100644
index 0000000..1cdf8c7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+
+public class CoBatchGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	protected long startCounter1;
+	protected long startCounter2;
+	protected long endCounter1;
+	protected long endCounter2;
+
+	public CoBatchGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
+			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
+		super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
+	}
+
+	@Override
+	protected void handleStream1() throws Exception {
+		circularList1.add(reuse1);
+		if (windowStart1()) {
+			circularList1.newSlide();
+		}
+		if (windowEnd1()) {
+			reduce1();
+			circularList1.shiftWindow();
+		}
+	}
+
+	@Override
+	protected void handleStream2() throws Exception {
+		circularList2.add(reuse2);
+		if (windowStart2()) {
+			circularList2.newSlide();
+		}
+		if (windowEnd2()) {
+			reduce2();
+			circularList2.shiftWindow();
+		}
+	}
+
+	@Override
+	protected boolean windowStart1() throws Exception {
+		if (startCounter1 == slideInterval1) {
+			startCounter1 = 0;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean windowStart2() throws Exception {
+		if (startCounter2 == slideInterval2) {
+			startCounter2 = 0;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean windowEnd1() throws Exception {
+		if (endCounter1 == windowSize1) {
+			endCounter1 -= slideInterval1;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean windowEnd2() throws Exception {
+		if (endCounter2 == windowSize2) {
+			endCounter2 -= slideInterval2;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		startCounter1 = 0;
+		startCounter2 = 0;
+		endCounter1 = 0;
+		endCounter2 = 0;
+	}
+
+	@Override
+	protected void initialize1() {
+		startCounter1++;
+		endCounter1++;
+	}
+
+	@Override
+	protected void initialize2() {
+		startCounter2++;
+		endCounter2++;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
index ab827d8..9700c70 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -17,69 +17,123 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.state.MutableTableState;
+import java.util.Iterator;
 
-public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.CircularFifoList;
+import org.apache.flink.streaming.state.StreamIterator;
+
+public abstract class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private int keyPosition1;
-	private int keyPosition2;
-	private MutableTableState<Object, IN1> values1;
-	private MutableTableState<Object, IN2> values2;
-	IN1 reduced1;
-	IN2 reduced2;
-
-	public CoGroupReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
-			int keyPosition2) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		this.keyPosition1 = keyPosition1;
-		this.keyPosition2 = keyPosition2;
-		values1 = new MutableTableState<Object, IN1>();
-		values2 = new MutableTableState<Object, IN2>();
+	protected CoGroupReduceFunction<IN1, IN2, OUT> coReducer;
+	protected StreamIterator<IN1> userIterator1;
+	protected StreamIterator<IN2> userIterator2;
+	protected Iterable<IN1> userIterable1;
+	protected Iterable<IN2> userIterable2;
+	protected long windowSize1;
+	protected long windowSize2;
+	protected long slideInterval1;
+	protected long slideInterval2;
+	protected CircularFifoList<StreamRecord<IN1>> circularList1;
+	protected CircularFifoList<StreamRecord<IN2>> circularList2;
+	protected long WindowStartTime1;
+	protected long WindowStartTime2;
+	protected long WindowEndTime1;
+	protected long WindowEndTime2;
+
+	public CoGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
+			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
+		super(reduceFunction);
+		this.coReducer = reduceFunction;
+		this.userIterator1 = new StreamIterator<IN1>();
+		this.userIterator2 = new StreamIterator<IN2>();
+		this.windowSize1 = windowSize1;
+		this.windowSize2 = windowSize2;
+		this.slideInterval1 = slideInterval1;
+		this.slideInterval2 = slideInterval2;
+		this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
+		this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		throw new RuntimeException("Reducing mutable sliding batch is not supported.");
 	}
 
 	@Override
-	public void handleStream1() throws Exception {
-		Object key = reuse1.getField(keyPosition1);
-		currentValue1 = values1.get(key);
-		nextValue1 = reuse1.getObject();
-		if (currentValue1 != null) {
-			callUserFunctionAndLogException1();
-			values1.put(key, reduced1);
-			collector.collect(coReducer.map1(reduced1));
-		} else {
-			values1.put(key, nextValue1);
-			collector.collect(coReducer.map1(nextValue1));
+	protected void handleStream1() throws Exception {
+		while (windowStart1()) {
+			circularList1.newSlide();
+		}
+		while (windowEnd1()) {
+			reduce1();
+			circularList1.shiftWindow();
 		}
+		circularList1.add(reuse1);
 	}
 
 	@Override
-	public void handleStream2() throws Exception {
-		Object key = reuse2.getField(keyPosition2);
-		currentValue2 = values2.get(key);
-		nextValue2 = reuse2.getObject();
-		if (currentValue2 != null) {
-			callUserFunctionAndLogException2();
-			values2.put(key, reduced2);
-			collector.collect(coReducer.map2(reduced2));
-		} else {
-			values2.put(key, nextValue2);
-			collector.collect(coReducer.map2(nextValue2));
+	protected void handleStream2() throws Exception {
+		while (windowStart2()) {
+			circularList2.newSlide();
 		}
+		while (windowEnd2()) {
+			reduce2();
+			circularList2.shiftWindow();
+		}
+		circularList2.add(reuse2);
+	}
+
+	protected void reduce1() throws Exception {
+		userIterator1.load(circularList1.getIterator());
+		callUserFunctionAndLogException1();
+	}
+
+	protected void reduce2() throws Exception {
+		userIterator2.load(circularList2.getIterator());
+		callUserFunctionAndLogException2();
 	}
 
 	@Override
 	protected void callUserFunction1() throws Exception {
-		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
-
+		coReducer.reduce1(userIterable1, collector);
 	}
 
 	@Override
 	protected void callUserFunction2() throws Exception {
-		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+		coReducer.reduce2(userIterable2, collector);
+	}
+
+	protected abstract boolean windowStart1() throws Exception;
+
+	protected abstract boolean windowStart2() throws Exception;
+
+	protected abstract boolean windowEnd1() throws Exception;
+
+	protected abstract boolean windowEnd2() throws Exception;
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		userIterable1 = new BatchIterable1();
+		userIterable2 = new BatchIterable2();
+	}
+
+	protected class BatchIterable1 implements Iterable<IN1> {
+		@Override
+		public Iterator<IN1> iterator() {
+			return userIterator1;
+		}
+	}
 
+	protected class BatchIterable2 implements Iterable<IN2> {
+		@Override
+		public Iterator<IN2> iterator() {
+			return userIterator2;
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java
new file mode 100644
index 0000000..20679dd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupedBatchGroupReduceInvokable<IN1, IN2, OUT> extends
+		CoBatchGroupReduceInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private int keyPosition1;
+	private int keyPosition2;
+	private Iterator<StreamRecord<IN1>> iterator1;
+	private Iterator<StreamRecord<IN2>> iterator2;
+	private MutableTableState<Object, List<IN1>> values1;
+	private MutableTableState<Object, List<IN2>> values2;
+	private IN1 nextValue1;
+	private IN2 nextValue2;
+
+	public CoGroupedBatchGroupReduceInvokable(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReduceFunction, long batchSize1,
+			long batchSize2, long slideSize1, long slideSize2, int keyPosition1, int keyPosition2) {
+		super(coReduceFunction, batchSize1, batchSize2, slideSize1, slideSize2);
+		this.keyPosition1 = keyPosition1;
+		this.keyPosition2 = keyPosition2;
+		values1 = new MutableTableState<Object, List<IN1>>();
+		values2 = new MutableTableState<Object, List<IN2>>();
+	}
+
+	@Override
+	protected void reduce1() {
+		iterator1 = circularList1.getIterator();
+		while (iterator1.hasNext()) {
+			StreamRecord<IN1> nextRecord = iterator1.next();
+			Object key = nextRecord.getField(keyPosition1);
+			nextValue1 = nextRecord.getObject();
+
+			List<IN1> group = values1.get(key);
+			if (group != null) {
+				group.add(nextValue1);
+			} else {
+				group = new ArrayList<IN1>();
+				group.add(nextValue1);
+				values1.put(key, group);
+			}
+		}
+		for (List<IN1> group : values1.values()) {
+			userIterable1 = group;
+			callUserFunctionAndLogException1();
+		}
+		values1.clear();
+	}
+
+	@Override
+	protected void reduce2() {
+		iterator2 = circularList2.getIterator();
+		while (iterator2.hasNext()) {
+			StreamRecord<IN2> nextRecord = iterator2.next();
+			Object key = nextRecord.getField(keyPosition2);
+			nextValue2 = nextRecord.getObject();
+
+			List<IN2> group = values2.get(key);
+			if (group != null) {
+				group.add(nextValue2);
+			} else {
+				group = new ArrayList<IN2>();
+				group.add(nextValue2);
+				values2.put(key, group);
+			}
+		}
+		for (List<IN2> group : values2.values()) {
+			userIterable2 = group;
+			callUserFunctionAndLogException2();
+		}
+		values2.clear();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
new file mode 100644
index 0000000..0daa1ea
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoStreamReduceInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private int keyPosition1;
+	private int keyPosition2;
+	private MutableTableState<Object, IN1> values1;
+	private MutableTableState<Object, IN2> values2;
+	IN1 reduced1;
+	IN2 reduced2;
+
+	public CoGroupedReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
+			int keyPosition2) {
+		super(coReducer);
+		this.coReducer = coReducer;
+		this.keyPosition1 = keyPosition1;
+		this.keyPosition2 = keyPosition2;
+		values1 = new MutableTableState<Object, IN1>();
+		values2 = new MutableTableState<Object, IN2>();
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		Object key = reuse1.getField(keyPosition1);
+		currentValue1 = values1.get(key);
+		nextValue1 = reuse1.getObject();
+		if (currentValue1 != null) {
+			callUserFunctionAndLogException1();
+			values1.put(key, reduced1);
+			collector.collect(coReducer.map1(reduced1));
+		} else {
+			values1.put(key, nextValue1);
+			collector.collect(coReducer.map1(nextValue1));
+		}
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		Object key = reuse2.getField(keyPosition2);
+		currentValue2 = values2.get(key);
+		nextValue2 = reuse2.getObject();
+		if (currentValue2 != null) {
+			callUserFunctionAndLogException2();
+			values2.put(key, reduced2);
+			collector.collect(coReducer.map2(reduced2));
+		} else {
+			values2.put(key, nextValue2);
+			collector.collect(coReducer.map2(nextValue2));
+		}
+	}
+
+	@Override
+	protected void callUserFunction1() throws Exception {
+		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+
+	}
+
+	@Override
+	protected void callUserFunction2() throws Exception {
+		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java
new file mode 100644
index 0000000..60cace6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupedWindowGroupReduceInvokable<IN1, IN2, OUT> extends
+		CoWindowGroupReduceInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private int keyPosition1;
+	private int keyPosition2;
+	private Iterator<StreamRecord<IN1>> iterator1;
+	private Iterator<StreamRecord<IN2>> iterator2;
+	private MutableTableState<Object, List<IN1>> values1;
+	private MutableTableState<Object, List<IN2>> values2;
+	private IN1 nextValue1;
+	private IN2 nextValue2;
+
+	public CoGroupedWindowGroupReduceInvokable(
+			CoGroupReduceFunction<IN1, IN2, OUT> coReduceFunction, long windowSize1,
+			long windowSize2, long slideInterval1, long slideInterval2, int keyPosition1,
+			int keyPosition2, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+		super(coReduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2,
+				timestamp1, timestamp2);
+		this.keyPosition1 = keyPosition1;
+		this.keyPosition2 = keyPosition2;
+		values1 = new MutableTableState<Object, List<IN1>>();
+		values2 = new MutableTableState<Object, List<IN2>>();
+	}
+
+	@Override
+	protected void reduce1() {
+		iterator1 = circularList1.getIterator();
+		while (iterator1.hasNext()) {
+			StreamRecord<IN1> nextRecord = iterator1.next();
+			Object key = nextRecord.getField(keyPosition1);
+			nextValue1 = nextRecord.getObject();
+
+			List<IN1> group = values1.get(key);
+			if (group != null) {
+				group.add(nextValue1);
+			} else {
+				group = new ArrayList<IN1>();
+				group.add(nextValue1);
+				values1.put(key, group);
+			}
+		}
+		for (List<IN1> group : values1.values()) {
+			userIterable1 = group;
+			callUserFunctionAndLogException1();
+		}
+		values1.clear();
+	}
+
+	@Override
+	protected void reduce2() {
+		iterator2 = circularList2.getIterator();
+		while (iterator2.hasNext()) {
+			StreamRecord<IN2> nextRecord = iterator2.next();
+			Object key = nextRecord.getField(keyPosition2);
+			nextValue2 = nextRecord.getObject();
+
+			List<IN2> group = values2.get(key);
+			if (group != null) {
+				group.add(nextValue2);
+			} else {
+				group = new ArrayList<IN2>();
+				group.add(nextValue2);
+				values2.put(key, group);
+			}
+		}
+		for (List<IN2> group : values2.values()) {
+			userIterable2 = group;
+			callUserFunctionAndLogException2();
+		}
+		values2.clear();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index b064df7..0e1f2b0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -85,9 +85,11 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 			if (next == 0) {
 				break;
 			} else if (next == 1) {
+				initialize1();
 				handleStream1();
 				resetReuse1();
 			} else {
+				initialize2();
 				handleStream2();
 				resetReuse2();
 			}
@@ -100,8 +102,10 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 			if (next == 0) {
 				break;
 			} else if (next == 1) {
+				initialize1();
 				handleStream1();
 			} else {
+				initialize2();
 				handleStream2();
 			}
 		}
@@ -114,7 +118,15 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 	protected abstract void callUserFunction1() throws Exception;
 
 	protected abstract void callUserFunction2() throws Exception;
-	
+
+	protected void initialize1() {
+
+	};
+
+	protected void initialize2() {
+
+	};
+
 	protected void callUserFunctionAndLogException1() {
 		try {
 			callUserFunction1();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100644
index 407f217..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
-	protected IN1 currentValue1 = null;
-	protected IN2 currentValue2 = null;
-	protected IN1 nextValue1 = null;
-	protected IN2 nextValue2 = null;
-
-	public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		currentValue1 = null;
-		currentValue2 = null;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		nextValue1 = reuse1.getObject();
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		nextValue2 = reuse2.getObject();
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		if (currentValue1 != null) {
-			currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
-		} else {
-			currentValue1 = nextValue1;
-		}
-		collector.collect(coReducer.map1(currentValue1));
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		if (currentValue2 != null) {
-			currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
-		} else {
-			currentValue2 = nextValue2;
-		}
-		collector.collect(coReducer.map2(currentValue2));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
new file mode 100644
index 0000000..81a6a23
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+
+public class CoStreamReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
+	protected IN1 currentValue1 = null;
+	protected IN2 currentValue2 = null;
+	protected IN1 nextValue1 = null;
+	protected IN2 nextValue2 = null;
+
+	public CoStreamReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+		super(coReducer);
+		this.coReducer = coReducer;
+		currentValue1 = null;
+		currentValue2 = null;
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		nextValue1 = reuse1.getObject();
+		callUserFunctionAndLogException1();
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		nextValue2 = reuse2.getObject();
+		callUserFunctionAndLogException2();
+	}
+
+	@Override
+	protected void callUserFunction1() throws Exception {
+		if (currentValue1 != null) {
+			currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
+		} else {
+			currentValue1 = nextValue1;
+		}
+		collector.collect(coReducer.map1(currentValue1));
+	}
+
+	@Override
+	protected void callUserFunction2() throws Exception {
+		if (currentValue2 != null) {
+			currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
+		} else {
+			currentValue2 = nextValue2;
+		}
+		collector.collect(coReducer.map2(currentValue2));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java
new file mode 100644
index 0000000..49c73b5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+public class CoWindowGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	protected long startTime1;
+	protected long startTime2;
+	protected long endTime1;
+	protected long endTime2;
+	protected long currentTime;
+	protected TimeStamp<IN1> timestamp1;
+	protected TimeStamp<IN2> timestamp2;
+
+	public CoWindowGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
+			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
+			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+		super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
+		this.timestamp1 = timestamp1;
+		this.timestamp2 = timestamp2;
+		startTime1 = timestamp1.getStartTime();
+		startTime2 = timestamp2.getStartTime();
+		endTime1 = startTime1 + windowSize1;
+		endTime2 = startTime2 + windowSize2;
+	}
+
+	@Override
+	protected boolean windowStart1() throws Exception {
+		if (currentTime - startTime1 >= slideInterval1) {
+			startTime1 += slideInterval1;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean windowStart2() throws Exception {
+		if (currentTime - startTime2 >= slideInterval2) {
+			startTime2 += slideInterval2;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean windowEnd1() throws Exception {
+		if (currentTime >= endTime1) {
+			endTime1 += slideInterval1;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean windowEnd2() throws Exception {
+		if (currentTime >= endTime2) {
+			endTime2 += slideInterval2;
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected void initialize1() {
+		currentTime = timestamp1.getTimestamp(reuse1.getObject());
+	}
+
+	@Override
+	protected void initialize2() {
+		currentTime = timestamp2.getTimestamp(reuse2.getObject());
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
new file mode 100644
index 0000000..46ccce7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * A simple class, that manages a circular queue with sliding interval. If the
+ * queue if full and a new element is added, the elements that belong to the
+ * first sliding interval are removed.
+ */
+public class CircularFifoList<T> {
+	private Queue<T> queue;
+	private Deque<Long> slideSizes;
+	private long counter;
+
+	public CircularFifoList() {
+		this.queue = new LinkedList<T>();
+		this.slideSizes = new ArrayDeque<Long>();
+		this.counter = 0;
+	}
+
+	public void add(T element) {
+		queue.add(element);
+		counter++;
+	}
+
+	public void newSlide() {
+		slideSizes.add(counter);
+		counter = 0;
+	}
+
+	public void shiftWindow() {
+		for (int i = 0; i < slideSizes.getFirst(); i++) {
+			queue.remove();
+		}
+		slideSizes.remove();
+	}
+
+	public Iterator<T> getIterator() {
+		return queue.iterator();
+	}
+
+	@Override
+	public String toString() {
+		return queue.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
new file mode 100644
index 0000000..e3ba4b5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state;
+
+import java.util.Iterator;
+
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+
+/**
+ * Simple wrapper class to convert an Iterator<StreamRecord<T>> to an
+ * Iterator<T> iterator by invoking the getObject() method on every element.
+ */
+public class StreamIterator<T> implements Iterator<T> {
+	private Iterator<StreamRecord<T>> iterator = null;
+
+	public void load(Iterator<StreamRecord<T>> iterator) {
+		this.iterator = iterator;
+	}
+
+	@Override
+	public boolean hasNext() {
+		return iterator.hasNext();
+	}
+
+	@Override
+	public T next() {
+		return iterator.next().getObject();
+	}
+
+	@Override
+	public void remove() {
+		iterator.remove();
+	}
+
+	@Override
+	public String toString() {
+		return iterator.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 3861aab..54264fe 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
-import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.util.MockInvokable;
 import org.junit.Test;
@@ -107,13 +107,13 @@ public class AggregationFunctionTest {
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
-				new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
+				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
-				new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
+				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
-				new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
+				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
 
 		assertEquals(expectedSumList, sumList);
 		assertEquals(expectedMinList, minList);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java
new file mode 100644
index 0000000..470d956
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoBatchGroupReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoBatchGroupReduceTest {
+
+	public static final class MyCoGroupReduceFunction implements
+			CoGroupReduceFunction<Integer, String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce1(Iterable<Integer> values, Collector<String> out) throws Exception {
+			String gather = "";
+			for (Integer value : values) {
+				gather += value.toString();
+			}
+			out.collect(gather);
+		}
+
+		@Override
+		public void reduce2(Iterable<String> values, Collector<String> out) throws Exception {
+			String gather = "";
+			for (String value : values) {
+				gather += value;
+			}
+			out.collect(gather);
+		}
+	}
+
+	@Test
+	public void coBatchGroupReduceTest1() {
+
+		List<Integer> inputs1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+		List<String> inputs2 = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
+
+		CoBatchGroupReduceInvokable<Integer, String, String> invokable = new CoBatchGroupReduceInvokable<Integer, String, String>(
+				new MyCoGroupReduceFunction(), 4L, 2L, 4L, 2L);
+
+		List<String> expected = Arrays.asList("1234", "5678", "ab", "cd", "ef", "gh");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		Collections.sort(expected);
+		Collections.sort(actualList);
+
+		assertEquals(expected, actualList);
+	}
+
+	@Test
+	public void coBatchGroupReduceTest2() {
+
+		List<Integer> inputs1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+		List<String> inputs2 = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
+
+		CoBatchGroupReduceInvokable<Integer, String, String> invokable = new CoBatchGroupReduceInvokable<Integer, String, String>(
+				new MyCoGroupReduceFunction(), 4L, 2L, 3L, 1L);
+
+		List<String> expected = Arrays.asList("1234", "4567", "78910", "ab", "bc", "cd", "de",
+				"ef", "fg", "gh");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		Collections.sort(expected);
+		Collections.sort(actualList);
+
+		assertEquals(expected, actualList);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index acf2f28..264b074 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -1,4 +1,5 @@
-/** Licensed to the Apache Software Foundation (ASF) under one or more
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -12,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.invokable.operator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
deleted file mode 100644
index bf9a772..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/** Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
-import org.junit.Test;
-
-public class CoGroupReduceTest {
-	
-	private final static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
-				Tuple3<String, String, String> value2) {
-			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
-		}
-
-		@Override
-		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
-				Tuple2<Integer, Integer> value2) {
-			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-		@Override
-		public String map1(Tuple3<String, String, String> value) {
-			return value.f1;
-		}
-
-		@Override
-		public String map2(Tuple2<Integer, Integer> value) {
-			return value.f1.toString();
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void coGroupReduceTest() {
-		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-		
-		CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), 0, 0);
-
-		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
-				"7");
-
-		List<String> actualList = MockCoInvokable.createAndExecute(invokable,
-				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-		
-	
-		invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), 2, 0);
-
-		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5",
-				"7");
-		
-		actualList = MockCoInvokable.createAndExecute(invokable,
-				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-		
-		assertEquals(expected, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java
new file mode 100644
index 0000000..f150d30
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchGroupReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoGroupedBatchGroupReduceTest {
+
+	public static final class MyCoGroupReduceFunction implements
+			CoGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce1(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
+				throws Exception {
+			String gather = "";
+			for (Tuple2<Integer, Integer> value : values) {
+				gather += value.f1.toString();
+			}
+			out.collect(gather);
+		}
+
+		@Override
+		public void reduce2(Iterable<Tuple2<Integer, String>> values, Collector<String> out)
+				throws Exception {
+			String gather = "";
+			for (Tuple2<Integer, String> value : values) {
+				gather += value.f1;
+			}
+			out.collect(gather);
+		}
+	}
+
+	final static int KEY_POSITION1 = 0;
+	final static int KEY_POSITION2 = 0;
+
+	@Test
+	public void coGroupedBatchGroupReduceTest1() {
+
+		List<Tuple2<Integer, Integer>> inputs1 = new ArrayList<Tuple2<Integer, Integer>>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs1.add(new Tuple2<Integer, Integer>(i % 3, i));
+		}
+
+		List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
+		for (char ch = 'a'; ch <= 'h'; ch++) {
+			inputs2.add(new Tuple2<Integer, String>(((int) ch) % 3, ch + ""));
+		}
+
+		CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> invokable = new CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String>(
+				new MyCoGroupReduceFunction(), 5L, 4L, 5L, 4L, KEY_POSITION1, KEY_POSITION2);
+
+		List<String> expected = Arrays.asList("14", "25", "3", "69", "710", "8", "ad", "b", "c",
+				"eh", "f", "g");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		Collections.sort(expected);
+		Collections.sort(actualList);
+
+		assertEquals(expected, actualList);
+	}
+
+	@Test
+	public void coGroupedBatchGroupReduceTest2() {
+
+		List<Tuple2<Integer, Integer>> inputs1 = new ArrayList<Tuple2<Integer, Integer>>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs1.add(new Tuple2<Integer, Integer>(i % 3, i));
+		}
+
+		List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
+		for (char ch = 'a'; ch <= 'h'; ch++) {
+			inputs2.add(new Tuple2<Integer, String>(((int) ch) % 3, ch + ""));
+		}
+
+		CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> invokable = new CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String>(
+				new MyCoGroupReduceFunction(), 6L, 6L, 3L, 2L, KEY_POSITION1, KEY_POSITION2);
+
+		List<String> expected = Arrays.asList("14", "25", "36", "47", "58", "69", "ad", "be", "cf",
+				"cf", "dg", "eh");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		Collections.sort(expected);
+		Collections.sort(actualList);
+
+		assertEquals(expected, actualList);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
new file mode 100644
index 0000000..9842d1c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.junit.Test;
+
+public class CoGroupedReduceTest {
+
+	private final static class MyCoReduceFunction implements
+			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
+				Tuple3<String, String, String> value2) {
+			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
+		}
+
+		@Override
+		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
+				Tuple2<Integer, Integer> value2) {
+			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+		}
+
+		@Override
+		public String map1(Tuple3<String, String, String> value) {
+			return value.f1;
+		}
+
+		@Override
+		public String map2(Tuple2<Integer, Integer> value) {
+			return value.f1.toString();
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void coGroupedReduceTest() {
+		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
+		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
+		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
+		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
+		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
+		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
+		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
+		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
+
+		CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+				new MyCoReduceFunction(), 0, 0);
+
+		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
+				"7");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+		assertEquals(expected, actualList);
+
+		invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+				new MyCoReduceFunction(), 2, 0);
+
+		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
+
+		actualList = MockCoInvokable.createAndExecute(invokable,
+				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+		assertEquals(expected, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java
new file mode 100644
index 0000000..b421284
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoGroupedWindowGroupReduceTest {
+
+	public static final class MyCoGroupReduceFunction1 implements
+			CoGroupReduceFunction<Character, Character, String> {
+		private static final long serialVersionUID = 1L;
+
+		@SuppressWarnings("unused")
+		@Override
+		public void reduce1(Iterable<Character> values, Collector<String> out) throws Exception {
+			Integer gather = 0;
+			Character ch = values.iterator().next();
+			for (Character value : values) {
+				gather++;
+			}
+			out.collect(ch + ":" + gather);
+		}
+
+		@SuppressWarnings("unused")
+		@Override
+		public void reduce2(Iterable<Character> values, Collector<String> out) throws Exception {
+			Integer gather = 0;
+			Character ch = values.iterator().next();
+			for (Character value : values) {
+				gather++;
+			}
+			out.collect(ch + ":" + gather);
+		}
+	}
+
+	public static final class MyCoGroupReduceFunction2 implements
+			CoGroupReduceFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce1(Iterable<Tuple2<String, Integer>> values, Collector<String> out)
+				throws Exception {
+			String gather = "";
+			for (Tuple2<String, Integer> value : values) {
+				gather += value.f0;
+			}
+			out.collect(gather);
+		}
+
+		@Override
+		public void reduce2(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
+				throws Exception {
+			Integer gather = 0;
+			for (Tuple2<Integer, Integer> value : values) {
+				gather += value.f0;
+			}
+			out.collect(gather.toString());
+		}
+	}
+
+	public static final class MyTimeStamp<T> implements TimeStamp<T> {
+		private static final long serialVersionUID = 1L;
+
+		private Iterator<Long> timestamps;
+		private long start;
+
+		public MyTimeStamp(List<Long> timestamps) {
+			this.timestamps = timestamps.iterator();
+			this.start = timestamps.get(0);
+		}
+
+		@Override
+		public long getTimestamp(T value) {
+			long ts = timestamps.next();
+			return ts;
+		}
+
+		@Override
+		public long getStartTime() {
+			return start;
+		}
+	}
+
+	@Test
+	public void coGroupedWindowGroupReduceTest1() {
+
+		List<Character> inputs1 = new ArrayList<Character>();
+		inputs1.add('a');
+		inputs1.add('b');
+		inputs1.add('c');
+		inputs1.add('a');
+		inputs1.add('a');
+		inputs1.add('c');
+		inputs1.add('b');
+		inputs1.add('c');
+		inputs1.add('a');
+		inputs1.add('a');
+		inputs1.add('x');
+
+		List<Character> inputs2 = new ArrayList<Character>();
+		inputs2.add('a');
+		inputs2.add('d');
+		inputs2.add('d');
+		inputs2.add('e');
+		inputs2.add('d');
+		inputs2.add('e');
+		inputs2.add('e');
+		inputs2.add('a');
+		inputs2.add('a');
+		inputs2.add('x');
+
+		List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 8L, 10L, 11L);
+
+		List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L, 7L, 7L, 8L, 8L, 10L);
+
+		CoGroupedWindowGroupReduceInvokable<Character, Character, String> invokable = new CoGroupedWindowGroupReduceInvokable<Character, Character, String>(
+				new MyCoGroupReduceFunction1(), 5L, 5L, 3L, 5L, 0, 0, new MyTimeStamp<Character>(
+						timestamps1), new MyTimeStamp<Character>(timestamps2));
+
+		List<String> expected = new ArrayList<String>();
+		expected.add("a:3");
+		expected.add("b:2");
+		expected.add("c:3");
+		expected.add("c:1");
+		expected.add("a:2");
+		expected.add("a:1");
+		expected.add("a:2");
+		expected.add("d:3");
+		expected.add("e:3");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		Collections.sort(expected);
+		Collections.sort(actualList);
+
+		assertEquals(expected, actualList);
+	}
+
+	@Test
+	public void coGroupedWindowGroupReduceTest2() {
+
+		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
+		inputs1.add(new Tuple2<String, Integer>("a", 1));
+		inputs1.add(new Tuple2<String, Integer>("b", 1));
+		inputs1.add(new Tuple2<String, Integer>("c", 0));
+		inputs1.add(new Tuple2<String, Integer>("d", 0));
+		inputs1.add(new Tuple2<String, Integer>("e", 1));
+		inputs1.add(new Tuple2<String, Integer>("f", 1));
+		inputs1.add(new Tuple2<String, Integer>("g", 0));
+		inputs1.add(new Tuple2<String, Integer>("h", 0));
+		inputs1.add(new Tuple2<String, Integer>("i", 1));
+		inputs1.add(new Tuple2<String, Integer>("j", 1));
+
+		List<Tuple2<Integer, Integer>> inputs2 = new ArrayList<Tuple2<Integer, Integer>>();
+		inputs2.add(new Tuple2<Integer, Integer>(1, 1));
+		inputs2.add(new Tuple2<Integer, Integer>(2, 2));
+		inputs2.add(new Tuple2<Integer, Integer>(3, 1));
+		inputs2.add(new Tuple2<Integer, Integer>(4, 2));
+		inputs2.add(new Tuple2<Integer, Integer>(5, 1));
+		inputs2.add(new Tuple2<Integer, Integer>(6, 2));
+		inputs2.add(new Tuple2<Integer, Integer>(7, 1));
+		inputs2.add(new Tuple2<Integer, Integer>(8, 2));
+		inputs2.add(new Tuple2<Integer, Integer>(9, 1));
+		inputs2.add(new Tuple2<Integer, Integer>(10, 2));
+
+		List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 4L, 7L);
+
+		List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L, 7L, 7L, 8L, 8L, 10L);
+
+		CoGroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String>(
+				new MyCoGroupReduceFunction2(), 2L, 4L, 2L, 2L, 1, 1,
+				new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
+				new MyTimeStamp<Tuple2<Integer, Integer>>(timestamps2));
+
+		List<String> expected = new ArrayList<String>();
+		expected.add("ab");
+		expected.add("cd");
+		expected.add("ef");
+		expected.add("gh");
+		expected.add("i");
+		expected.add("1");
+		expected.add("3");
+		expected.add("2");
+		expected.add("15");
+		expected.add("12");
+		expected.add("21");
+		expected.add("18");
+
+		List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		Collections.sort(expected);
+		Collections.sort(actualList);
+
+		assertEquals(expected, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java
deleted file mode 100755
index 4f52444..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/** Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
-import org.junit.Test;
-
-public class CoReduceTest {
-
-	public static class MyCoReduceFunction implements CoReduceFunction<Integer, String, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce1(Integer value1, Integer value2) {
-			return value1 * value2;
-		}
-
-		@Override
-		public String reduce2(String value1, String value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public Integer map1(Integer value) {
-			return value;
-		}
-
-		@Override
-		public Integer map2(String value) {
-			return Integer.parseInt(value);
-		}
-
-	}
-
-	@Test
-	public void coGroupReduceTest() {
-
-		CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>(
-				new MyCoReduceFunction());
-
-		List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
-
-		assertEquals(
-				expected1,
-				(MockCoInvokable.createAndExecute(coReduce, Arrays.asList(1, 2, 3, 4),
-						Arrays.asList("9", "9", "8"))));
-
-	}
-}