You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:51 UTC

[08/18] git commit: [FLINK-1080] [streaming] Streaming aggregation update and refactor

[FLINK-1080] [streaming] Streaming aggregation update and refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bcbebed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bcbebed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bcbebed0

Branch: refs/heads/master
Commit: bcbebed01537ee7a414cd51428f4a3b8e8fc23e1
Parents: 4d73f51
Author: gyfora <gy...@gmail.com>
Authored: Fri Sep 5 16:07:59 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 116 +++++++++++---
 .../api/datastream/GroupedDataStream.java       |  71 +++++++--
 .../aggregation/AggregationFunction.java        |  34 +++++
 .../ComparableAggregationFunction.java          |  12 +-
 .../aggregation/MaxAggregationFunction.java     |  32 ++++
 .../aggregation/MinAggregationFunction.java     |  32 ++++
 .../StreamingAggregationFunction.java           |  45 ------
 .../StreamingMaxAggregationFunction.java        |  32 ----
 .../StreamingMinAggregationFunction.java        |  32 ----
 .../StreamingSumAggregationFunction.java        |  64 --------
 .../aggregation/SumAggregationFunction.java     | 150 +++++++++++++++++++
 .../operator/BatchGroupReduceInvokable.java     |   1 -
 .../api/invokable/operator/BatchIterator.java   |   3 +-
 .../operator/WindowGroupReduceInvokable.java    |   1 -
 .../streaming/api/AggregationFunctionTest.java  |  92 +++++++++---
 .../api/invokable/operator/CoFlatMapTest.java   |   6 -
 16 files changed, 479 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 28d07d6..d78ceae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -34,12 +34,13 @@ import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
@@ -171,7 +172,43 @@ public abstract class DataStream<OUT> {
 	public TypeInformation<OUT> getOutputType() {
 		return this.outTypeWrapper.getTypeInfo();
 	}
-	
+
+	/**
+	 * Gets the class of the field at the given position
+	 * 
+	 * @param pos
+	 *            Position of the field
+	 * @return The class of the field
+	 */
+	@SuppressWarnings("rawtypes")
+	protected Class<?> getClassAtPos(int pos) {
+		Class<?> type;
+		TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
+		if (outTypeInfo.isTupleType()) {
+			type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
+		} else if (pos == 0) {
+			type = outTypeInfo.getTypeClass();
+		} else {
+			throw new IndexOutOfBoundsException("Position is out of range");
+		}
+		return type;
+	}
+
+	/**
+	 * Checks if the given field position is allowed for the output type
+	 * 
+	 * @param pos
+	 *            Position to check
+	 */
+	protected void checkFieldRange(int pos) {
+		try {
+			getClassAtPos(pos);
+		} catch (IndexOutOfBoundsException e) {
+			throw new RuntimeException("Selected field is out of range");
+
+		}
+	}
+
 	/**
 	 * Creates a new {@link MergedDataStream} by merging {@link DataStream}
 	 * outputs of the same type with each other. The DataStreams merged using
@@ -483,50 +520,82 @@ public abstract class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that sums the data stream at the given
-	 * position.
+	 * Applies an aggregation that sums the data stream at the given position.
 	 * 
 	 * @param positionToSum
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
+	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
-		return aggregateAll(new StreamingSumAggregationFunction<OUT>(positionToSum));
+		checkFieldRange(positionToSum);
+		return aggregateAll((AggregationFunction<OUT>) SumAggregationFunction
+				.getSumFunction(positionToSum, getClassAtPos(positionToSum)));
+	}
+
+	/**
+	 * Applies an aggregation that sums the data stream at the first position .
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum() {
+		return sum(0);
 	}
-	
+
 	/**
-	 * Applies an aggregation that that gives the minimum of the data stream at the given
-	 * position.
+	 * Applies an aggregation that that gives the minimum of the data stream at
+	 * the given position.
 	 * 
 	 * @param positionToMin
 	 *            The position in the data point to minimize
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
-		return aggregateAll(new StreamingMinAggregationFunction<OUT>(positionToMin));
+		checkFieldRange(positionToMin);
+		return aggregateAll(new MinAggregationFunction<OUT>(positionToMin));
 	}
-	
+
 	/**
-	 * Applies an aggregation that gives the maximum of the data stream at the given
-	 * position.
+	 * Applies an aggregation that that gives the minimum of the data stream at
+	 * the first position.
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min() {
+		return min(0);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum of the data stream at the
+	 * given position.
 	 * 
 	 * @param positionToMax
 	 *            The position in the data point to maximize
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
-		return aggregateAll(new StreamingMaxAggregationFunction<OUT>(positionToMax));
+		checkFieldRange(positionToMax);
+		return aggregateAll(new MaxAggregationFunction<OUT>(positionToMax));
 	}
 
-	private SingleOutputStreamOperator<OUT, ?> aggregateAll(StreamingAggregationFunction<OUT> aggregate) {
+	/**
+	 * Applies an aggregation that gives the maximum of the data stream at the
+	 * first position.
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max() {
+		return max(0);
+	}
+
+	private SingleOutputStreamOperator<OUT, ?> aggregateAll(
+			AggregationFunction<OUT> aggregate) {
 		return aggregate(aggregate, new StreamReduceInvokable<OUT>(aggregate), "reduce");
 	}
-	
-	SingleOutputStreamOperator<OUT, ?> aggregate(StreamingAggregationFunction<OUT> aggregate, StreamReduceInvokable<OUT> invokable, String functionName) {
-		DataStream<OUT> inputStream = this.copy();
-		TypeInformation<?> info = this.jobGraphBuilder.getOutTypeInfo(inputStream.getId());
 
-		aggregate.setType(info);
+	SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate,
+			StreamReduceInvokable<OUT> invokable, String functionName) {
+		DataStream<OUT> inputStream = this.copy();
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = inputStream.addFunction(functionName,
 				aggregate, null, null, invokable);
@@ -1014,7 +1083,8 @@ public abstract class DataStream<OUT> {
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
 			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
-		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", outTypeWrapper);
+		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
+				outTypeWrapper);
 
 		try {
 			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 2e1ed57..06bec0a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -21,10 +21,10 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
@@ -52,14 +52,19 @@ public class GroupedDataStream<OUT> {
 	}
 
 	/**
-	 * Gets the output type.
+	 * Applies a reduce transformation on the grouped data stream grouped by the
+	 * given key position. The {@link ReduceFunction} will receive input values
+	 * based on the key value. Only input values with the same key will go to
+	 * the same reducer.The user can also extend {@link RichReduceFunction} to
+	 * gain access to other features provided by the {@link RichFuntion}
+	 * interface. Gets the output type.
 	 * 
 	 * @return The output type.
 	 */
 	public TypeInformation<OUT> getOutputType() {
 		return dataStream.getOutputType();
 	}
-	
+
 	/**
 	 * Applies a reduce transformation on the grouped data stream grouped on by
 	 * the given key position. The {@link ReduceFunction} will receive input
@@ -78,7 +83,7 @@ public class GroupedDataStream<OUT> {
 				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
 				ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
 	}
-	
+
 	/**
 	 * Applies a group reduce transformation on preset chunks of the grouped
 	 * data stream. The {@link GroupReduceFunction} will receive input values
@@ -214,8 +219,22 @@ public class GroupedDataStream<OUT> {
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
+	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
-		return aggregateGroup(new StreamingSumAggregationFunction<OUT>(positionToSum));
+		dataStream.checkFieldRange(positionToSum);
+		return aggregateGroup((AggregationFunction<OUT>) SumAggregationFunction
+				.getSumFunction(positionToSum, dataStream.getClassAtPos(positionToSum)));
+	}
+
+	/**
+	 * Applies an aggregation that sums the grouped data stream at the first
+	 * position, grouped by the given key position. Input values with the same
+	 * key will be summed.
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum() {
+		return sum(0);
 	}
 
 	/**
@@ -228,7 +247,21 @@ public class GroupedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
-		return aggregateGroup(new StreamingMinAggregationFunction<OUT>(positionToMin));
+		dataStream.checkFieldRange(positionToMin);
+		return aggregateGroup(new MinAggregationFunction<OUT>(positionToMin));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum of the grouped data stream
+	 * at the first position, grouped by the given key position. Input values
+	 * with the same key will be minimized.
+	 * 
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min() {
+		return min(0);
 	}
 
 	/**
@@ -241,10 +274,24 @@ public class GroupedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
-		return aggregateGroup(new StreamingMaxAggregationFunction<OUT>(positionToMax));
+		dataStream.checkFieldRange(positionToMax);
+		return aggregateGroup(new MaxAggregationFunction<OUT>(positionToMax));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum of the grouped data stream
+	 * at the first position, grouped by the given key position. Input values
+	 * with the same key will be maximized.
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max() {
+		return max(0);
 	}
 
-	private SingleOutputStreamOperator<OUT, ?> aggregateGroup(StreamingAggregationFunction<OUT> aggregate) {
-		return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate, keyPosition), "groupReduce");
+	private SingleOutputStreamOperator<OUT, ?> aggregateGroup(
+			AggregationFunction<OUT> aggregate) {
+		return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate,
+				keyPosition), "groupReduce");
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..daae0b8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class AggregationFunction<T> implements ReduceFunction<T> {
+	private static final long serialVersionUID = 1L;
+	
+	public int position;
+	protected Tuple returnTuple;
+
+	public AggregationFunction(int pos) {
+		this.position = pos;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
index dc74715..0819340 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.function.aggregation;
 
 import org.apache.flink.api.java.tuple.Tuple;
 
-public abstract class ComparableAggregationFunction<T> extends StreamingAggregationFunction<T> {
+public abstract class ComparableAggregationFunction<T> extends AggregationFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -39,25 +39,25 @@ public abstract class ComparableAggregationFunction<T> extends StreamingAggregat
 			return (T) returnTuple;
 		} else if (value1 instanceof Comparable) {
 			if (isExtremal((Comparable<Object>) value1, value2)) {
-				value2 = value1;
+				return value1;
+			}else{
+				return value2;
 			}
 		} else {
 			throw new RuntimeException("The values " + value1 +  " and "+ value2 + " cannot be compared.");
 		}
-
-		return null;
 	}
 
 	public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
 			IllegalAccessException {
-		copyTuple(tuple2);
 
 		Comparable<R> o1 = tuple1.getField(position);
 		R o2 = tuple2.getField(position);
 
 		if (isExtremal(o1, o2)) {
-			returnTuple.setField(o1, position);
+			tuple2.setField(o1, position);
 		}
+		returnTuple = tuple2;
 	}
 
 	public abstract <R> boolean isExtremal(Comparable<R> o1, R o2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
new file mode 100644
index 0000000..521fff6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class MaxAggregationFunction<T> extends ComparableAggregationFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	public MaxAggregationFunction(int pos) {
+		super(pos);
+	}
+
+	@Override
+	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+		return o1.compareTo(o2) > 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
new file mode 100644
index 0000000..a01d6c0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+public class MinAggregationFunction<T> extends ComparableAggregationFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	public MinAggregationFunction(int pos) {
+		super(pos);
+	}
+
+	@Override
+	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
+		return o1.compareTo(o2) < 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
deleted file mode 100644
index 42c1053..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingAggregationFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.TypeInformation;
-
-public abstract class StreamingAggregationFunction<T> implements ReduceFunction<T> {
-	private static final long serialVersionUID = 1L;
-	
-	protected int position;
-	private TypeSerializer<Tuple> typeSerializer;
-	protected Tuple returnTuple;
-
-	public StreamingAggregationFunction(int pos) {
-		this.position = pos;
-	}
-
-	@SuppressWarnings("unchecked")
-	public void setType(TypeInformation<?> type) {
-		this.typeSerializer = (TypeSerializer<Tuple>) type.createSerializer();
-	}
-
-	protected void copyTuple(Tuple tuple) throws InstantiationException, IllegalAccessException {
-		returnTuple = (Tuple) typeSerializer.createInstance();
-		typeSerializer.copy(tuple, returnTuple);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
deleted file mode 100644
index bae0043..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMaxAggregationFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-public class StreamingMaxAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public StreamingMaxAggregationFunction(int pos) {
-		super(pos);
-	}
-
-	@Override
-	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
-		return o1.compareTo(o2) > 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
deleted file mode 100644
index eb349c6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingMinAggregationFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-public class StreamingMinAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public StreamingMinAggregationFunction(int pos) {
-		super(pos);
-	}
-
-	@Override
-	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
-		return o1.compareTo(o2) < 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
deleted file mode 100644
index 1a043c1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/StreamingSumAggregationFunction.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class StreamingSumAggregationFunction<T> extends StreamingAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public StreamingSumAggregationFunction(int pos) {
-		super(pos);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public T reduce(T value1, T value2) throws Exception {
-		if (value1 instanceof Tuple) {
-			Tuple tuple1 = (Tuple) value1;
-			Tuple tuple2 = (Tuple) value2;
-
-			copyTuple(tuple2);
-			returnTuple.setField(add(tuple1.getField(position), tuple2.getField(position)), position);
-
-			return (T) returnTuple;
-		} else {
-			return (T) add(value1, value2);
-		}
-	}
-
-	private Object add(Object value1, Object value2) {
-		if (value1 instanceof Integer) {
-			return (Integer) value1 + (Integer) value2;
-		} else if (value1 instanceof Double) {
-			return (Double) value1 + (Double) value2;
-		} else if (value1 instanceof Float) {
-			return (Float) value1 + (Float) value2;
-		} else if (value1 instanceof Long) {
-			return (Long) value1 + (Long) value2;
-		} else if (value1 instanceof Short) {
-			return (short) ((Short) value1 + (Short) value2);
-		} else if (value1 instanceof Byte) {
-			return (byte) ((Byte) value1 + (Byte) value2);
-		} else {
-			throw new RuntimeException("DataStream cannot be summed because the class "
-					+ value1.getClass().getSimpleName() + " does not support the + operator.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
new file mode 100644
index 0000000..3f54590
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	public SumAggregationFunction(int pos) {
+		super(pos);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T reduce(T value1, T value2) throws Exception {
+		if (value1 instanceof Tuple) {
+			Tuple tuple1 = (Tuple) value1;
+			Tuple tuple2 = (Tuple) value2;
+
+			returnTuple = tuple2;
+			returnTuple.setField(add(tuple1.getField(position), tuple2.getField(position)),
+					position);
+
+			return (T) returnTuple;
+		} else {
+			return (T) add(value1, value2);
+		}
+	}
+
+	protected abstract Object add(Object value1, Object value2);
+
+	@SuppressWarnings("rawtypes")
+	public static <T> SumAggregationFunction getSumFunction(int pos, Class<T> type) {
+
+		if (type == Integer.class) {
+			return new IntSum<T>(pos);
+		} else if (type == Long.class) {
+			return new LongSum<T>(pos);
+		} else if (type == Short.class) {
+			return new ShortSum<T>(pos);
+		} else if (type == Double.class) {
+			return new DoubleSum<T>(pos);
+		} else if (type == Float.class) {
+			return new FloatSum<T>(pos);
+		} else if (type == Byte.class) {
+			return new ByteSum<T>(pos);
+		} else {
+			throw new RuntimeException("DataStream cannot be summed because the class "
+					+ type.getSimpleName() + " does not support the + operator.");
+		}
+
+	}
+
+	private static class IntSum<T> extends SumAggregationFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		public IntSum(int pos) {
+			super(pos);
+		}
+
+		@Override
+		protected Object add(Object value1, Object value2) {
+			return (Integer) value1 + (Integer) value2;
+		}
+	}
+
+	private static class LongSum<T> extends SumAggregationFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		public LongSum(int pos) {
+			super(pos);
+		}
+
+		@Override
+		protected Object add(Object value1, Object value2) {
+			return (Long) value1 + (Long) value2;
+		}
+	}
+
+	private static class DoubleSum<T> extends SumAggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public DoubleSum(int pos) {
+			super(pos);
+		}
+
+		@Override
+		protected Object add(Object value1, Object value2) {
+			return (Double) value1 + (Double) value2;
+		}
+	}
+
+	private static class ShortSum<T> extends SumAggregationFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		public ShortSum(int pos) {
+			super(pos);
+		}
+
+		@Override
+		protected Object add(Object value1, Object value2) {
+			return (Short) value1 + (Short) value2;
+		}
+	}
+
+	private static class FloatSum<T> extends SumAggregationFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		public FloatSum(int pos) {
+			super(pos);
+		}
+
+		@Override
+		protected Object add(Object value1, Object value2) {
+			return (Float) value1 + (Float) value2;
+		}
+	}
+
+	private static class ByteSum<T> extends SumAggregationFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		public ByteSum(int pos) {
+			super(pos);
+		}
+
+		@Override
+		protected Object add(Object value1, Object value2) {
+			return (Byte) value1 + (Byte) value2;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index 95b3249..be6392e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -30,7 +30,6 @@ public class BatchGroupReduceInvokable<IN, OUT> extends BatchReduceInvokable<IN,
 	private static final long serialVersionUID = 1L;
 
 	int keyPosition;
-	protected GroupReduceFunction<IN, OUT> reducer;
 	private Iterator<StreamRecord<IN>> iterator;
 	private MutableTableState<Object, List<IN>> values;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
index dc27da4..05f888f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
@@ -17,8 +17,9 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.io.Serializable;
 import java.util.Iterator;
 
-public interface BatchIterator<IN> extends Iterator<IN> {
+public interface BatchIterator<IN> extends Iterator<IN>, Serializable {
 	public void reset();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 8b658f3..87b00f9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.state.MutableTableState;
 public class WindowGroupReduceInvokable<IN, OUT> extends WindowReduceInvokable<IN, OUT> {
 
 	int keyPosition;
-	protected GroupReduceFunction<IN, OUT> reducer;
 	private Iterator<StreamRecord<IN>> iterator;
 	private MutableTableState<Object, List<IN>> values;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 7a502aa..3861aab 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -18,15 +18,16 @@
 package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingMinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.StreamingSumAggregationFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.util.MockInvokable;
@@ -36,26 +37,33 @@ public class AggregationFunctionTest {
 
 	@Test
 	public void groupSumIntegerTest() {
-		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
 
 		List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
 		List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
 		List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
+		List<Integer> expectedSumList0 = new ArrayList<Integer>();
+		List<Integer> expectedMinList0 = new ArrayList<Integer>();
+		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
 		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
 		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
 		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
 
+		List<Integer> simpleInput = new ArrayList<Integer>();
+		
 		int groupedSum0 = 0;
 		int groupedSum1 = 0;
 		int groupedSum2 = 0;
 
 		for (int i = 0; i < 9; i++) {
-			inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
+			simpleInput.add(i);
 			expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
 			expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
 			expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
 
+			expectedSumList0.add((i + 1) * i / 2);
+			expectedMaxList0.add(i);
+			expectedMinList0.add(0);
+
 			int groupedSum;
 			switch (i % 3) {
 			case 0:
@@ -74,30 +82,38 @@ public class AggregationFunctionTest {
 			expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
 		}
 
-		StreamingSumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = new StreamingSumAggregationFunction<Tuple2<Integer, Integer>>(
-				1);
-		StreamingMinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new StreamingMinAggregationFunction<Tuple2<Integer, Integer>>(
+		@SuppressWarnings("unchecked")
+		SumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregationFunction
+				.getSumFunction(1, Integer.class);
+		@SuppressWarnings("unchecked")
+		SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction
+				.getSumFunction(0, Integer.class);
+		MinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new MinAggregationFunction<Tuple2<Integer, Integer>>(
 				1);
-		StreamingMaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new StreamingMaxAggregationFunction<Tuple2<Integer, Integer>>(
+		MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>(
+				0);
+		MaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new MaxAggregationFunction<Tuple2<Integer, Integer>>(
 				1);
-
-		sumFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
-		minFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
-		maxFunction.setType(TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0)));
+		MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>(
+				0);
 
 		List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), inputList);
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
+
 		List<Tuple2<Integer, Integer>> minList = MockInvokable.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), inputList);
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
+
 		List<Tuple2<Integer, Integer>> maxList = MockInvokable.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), inputList);
+				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
-				new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), inputList);
+				new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
+
 		List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
-				new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), inputList);
+				new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
+
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
-				new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), inputList);
+				new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
 
 		assertEquals(expectedSumList, sumList);
 		assertEquals(expectedMinList, minList);
@@ -105,5 +121,39 @@ public class AggregationFunctionTest {
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
+		assertEquals(expectedSumList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(sumFunction0),simpleInput ));
+		assertEquals(expectedMinList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(minFunction0),simpleInput ));
+		assertEquals(expectedMaxList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(maxFunction0),simpleInput ));
+
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		try {
+			env.generateSequence(1, 100).min(1);
+			fail();
+		} catch (Exception e) {
+			//Nothing to do here
+		}
+		try {
+			env.generateSequence(1, 100).min(2);
+			fail();
+		} catch (Exception e) {
+			//Nothing to do here
+		}
+		try {
+			env.generateSequence(1, 100).min(3);
+			fail();
+		} catch (Exception e) {
+			//Nothing to do here
+		}
+
+	}
+
+	private List<Tuple2<Integer, Integer>> getInputList() {
+		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
+		for (int i = 0; i < 9; i++) {
+			inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+		}
+		return inputList;
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcbebed0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index 0c4bad1..acf2f28 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -80,11 +80,5 @@ public class CoFlatMapTest implements Serializable {
 			// expected
 		}
 		
-		try {
-			env.fromElements(10, 11).connect(ds2);
-			fail();
-		} catch (RuntimeException e) {
-			// expected
-		}
 	}
 }