You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/09 20:08:45 UTC

[1/4] incubator-flink git commit: [FLINK-1224] [streaming] getExecutionEnvironment method fixed to work properly for StreamExecutionEnvironment

Repository: incubator-flink
Updated Branches:
  refs/heads/master e58049711 -> 34c13e978


[FLINK-1224] [streaming] getExecutionEnvironment method fixed to work properly for StreamExecutionEnvironment


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/34c13e97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/34c13e97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/34c13e97

Branch: refs/heads/master
Commit: 34c13e978c37f0be21393002eab7e24b81081a9f
Parents: 97d465b
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Nov 7 23:05:20 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Sun Nov 9 13:16:46 2014 +0100

----------------------------------------------------------------------
 .../api/environment/LocalStreamEnvironment.java | 10 ++++--
 .../environment/StreamExecutionEnvironment.java |  8 +++--
 .../flink/streaming/util/ClusterUtil.java       | 36 ++++++++++++++++++--
 .../client/program/ContextEnvironment.java      |  4 +++
 4 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34c13e97/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 94e0891..5c0f555 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -39,8 +39,14 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public void execute(String jobName) throws Exception {
-		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
-				getExecutionParallelism());
+		if (localExecutionIsAllowed()) {
+			ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+					getExecutionParallelism());
+		} else {
+			ClusterUtil.runOnLocalCluster(this.jobGraphBuilder.getJobGraph(jobName),
+					getExecutionParallelism());
+		}
+
 	}
 
 	public void executeTest(long memorySize) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34c13e97/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4b7afbb..0d00db3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -123,14 +123,15 @@ public abstract class StreamExecutionEnvironment {
 	 * <li>
 	 * 0 triggers flushing after every record thus minimizing latency</li>
 	 * <li>
-	 * -1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
+	 * -1 triggers flushing only when the output buffer is full thus maximizing
+	 * throughput</li>
 	 * </ul>
 	 * 
 	 * @param timeoutMillis
 	 *            The maximum time between two output flushes.
 	 */
 	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
-		if (timeoutMillis < -1 ) {
+		if (timeoutMillis < -1) {
 			throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
 		}
 
@@ -350,7 +351,8 @@ public abstract class StreamExecutionEnvironment {
 	 *         executed.
 	 */
 	public static StreamExecutionEnvironment getExecutionEnvironment() {
-		return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
+		allowLocalExecution = ExecutionEnvironment.localExecutionIsAllowed();
+		return createLocalEnvironment();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34c13e97/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index f9c6b25..278cb5a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,8 +19,10 @@ package org.apache.flink.streaming.util;
 
 import java.net.InetSocketAddress;
 
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -42,7 +44,8 @@ public class ClusterUtil {
 	 * @param memorySize
 	 *            memorySize
 	 */
-	public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfPrallelism, long memorySize) throws Exception  {
+	public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfPrallelism, long memorySize)
+			throws Exception {
 
 		Configuration configuration = jobGraph.getJobConfiguration();
 
@@ -76,7 +79,36 @@ public class ClusterUtil {
 		}
 	}
 
-	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) throws Exception {
+	public static void runOnLocalCluster(JobGraph jobGraph, int degreeOfPrallelism)
+			throws Exception {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running on mini cluster");
+		}
+
+		try {
+
+			Client client = ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+					.getClient();
+
+			client.run(jobGraph, true);
+		} catch (ProgramInvocationException e) {
+			if (e.getMessage().contains("GraphConversionException")) {
+				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
+			} else {
+				throw e;
+			}
+		} catch (Exception e) {
+			throw e;
+		} finally {
+			try {
+			} catch (Throwable t) {
+			}
+		}
+	}
+
+	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers)
+			throws Exception {
 		runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34c13e97/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 393abc4..4f91514 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -79,4 +79,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public static void disableLocalExecution() {
 		ExecutionEnvironment.disableLocalExecution();
 	}
+	
+	public Client getClient() {
+		return this.client;
+	}
 }


[2/4] incubator-flink git commit: [streaming] Aggregation rework to support field expression based aggregations for Pojo data streams

Posted by mb...@apache.org.
[streaming] Aggregation rework to support field expression based aggregations for Pojo data streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7ae58042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7ae58042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7ae58042

Branch: refs/heads/master
Commit: 7ae58042787bf307304d59182ba1f0825a480917
Parents: c9f3846
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 5 16:23:22 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Sun Nov 9 13:16:46 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/BatchedDataStream.java       | 118 +++++++--
 .../streaming/api/datastream/DataStream.java    | 118 ++++++++-
 .../aggregation/AggregationFunction.java        |  17 +-
 .../ComparableAggregationFunction.java          |  83 -------
 .../aggregation/ComparableAggregator.java       | 243 +++++++++++++++++++
 .../api/function/aggregation/Comparator.java    | 104 ++++++++
 .../aggregation/MaxAggregationFunction.java     |  34 ---
 .../aggregation/MaxByAggregationFunction.java   |  39 ---
 .../aggregation/MinAggregationFunction.java     |  34 ---
 .../aggregation/MinByAggregationFunction.java   |  70 ------
 .../aggregation/SumAggregationFunction.java     | 159 ------------
 .../api/function/aggregation/SumAggregator.java | 173 +++++++++++++
 .../api/function/aggregation/SumFunction.java   | 102 ++++++++
 .../streaming/api/AggregationFunctionTest.java  |  80 +++---
 14 files changed, 877 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 86cb90b..c8a49c6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -24,11 +24,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
@@ -151,11 +149,10 @@ public class BatchedDataStream<OUT> {
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
-	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		dataStream.checkFieldRange(positionToSum);
-		return aggregate((AggregationFunction<OUT>) SumAggregationFunction.getSumFunction(
-				positionToSum, dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
+				dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
 	}
 
 	/**
@@ -168,6 +165,23 @@ public class BatchedDataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that that gives the sum of the pojo data stream at
+	 * the given field expression. A field expression is either the name of a
+	 * public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
+				getOutputType()));
+	}
+
+	/**
 	 * Applies an aggregation that that gives the minimum of every sliding
 	 * batch/window of the data stream at the given position.
 	 * 
@@ -177,7 +191,8 @@ public class BatchedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		dataStream.checkFieldRange(positionToMin);
-		return aggregate(new MinAggregationFunction<OUT>(positionToMin, dataStream.getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+				AggregationType.MIN));
 	}
 
 	/**
@@ -209,8 +224,8 @@ public class BatchedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
 		dataStream.checkFieldRange(positionToMinBy);
-		return aggregate(new MinByAggregationFunction<OUT>(positionToMinBy, first,
-				dataStream.getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+				AggregationType.MINBY, first));
 	}
 
 	/**
@@ -232,7 +247,8 @@ public class BatchedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		dataStream.checkFieldRange(positionToMax);
-		return aggregate(new MaxAggregationFunction<OUT>(positionToMax, dataStream.getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+				AggregationType.MAX));
 	}
 
 	/**
@@ -263,8 +279,8 @@ public class BatchedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
 		dataStream.checkFieldRange(positionToMaxBy);
-		return aggregate(new MaxByAggregationFunction<OUT>(positionToMaxBy, first,
-				dataStream.getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+				AggregationType.MAXBY, first));
 	}
 
 	/**
@@ -277,6 +293,80 @@ public class BatchedDataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that that gives the minimum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MIN, false));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MAX, false));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MINBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MAXBY, first));
+	}
+
+	/**
 	 * Gets the output type.
 	 * 
 	 * @return The output type.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b50c42d..991b6d7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -42,11 +42,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
@@ -763,11 +761,27 @@ public class DataStream<OUT> {
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
-	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		checkFieldRange(positionToSum);
-		return aggregate((AggregationFunction<OUT>) SumAggregationFunction.getSumFunction(
-				positionToSum, getClassAtPos(positionToSum), getOutputType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
+				getClassAtPos(positionToSum), getOutputType()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the sum of the pojo data stream at
+	 * the given field expression. A field expression is either the name of a
+	 * public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
+				getOutputType()));
 	}
 
 	/**
@@ -789,7 +803,82 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		checkFieldRange(positionToMin);
-		return aggregate(new MinAggregationFunction<OUT>(positionToMin, getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+				AggregationType.MIN));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MIN, false));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MAX, false));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MINBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MAXBY, first));
 	}
 
 	/**
@@ -821,7 +910,8 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
 		checkFieldRange(positionToMinBy);
-		return aggregate(new MinByAggregationFunction<OUT>(positionToMinBy, first, getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+				AggregationType.MINBY, first));
 	}
 
 	/**
@@ -843,7 +933,8 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		checkFieldRange(positionToMax);
-		return aggregate(new MaxAggregationFunction<OUT>(positionToMax, getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+				AggregationType.MAX));
 	}
 
 	/**
@@ -875,7 +966,8 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
 		checkFieldRange(positionToMaxBy);
-		return aggregate(new MaxByAggregationFunction<OUT>(positionToMaxBy, first, getOutputType()));
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+				AggregationType.MAXBY, first));
 	}
 
 	/**
@@ -888,7 +980,7 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the count of the data point.
+	 * Applies an aggregation that gives the count of the values.
 	 * 
 	 * @return The transformed DataStream.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
index 825b4db..d95c37e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
@@ -18,23 +18,18 @@
 package org.apache.flink.streaming.api.function.aggregation;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
 
 public abstract class AggregationFunction<T> implements ReduceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
-	public int position;
-	protected Tuple returnTuple;
-	protected boolean isTuple;
-	protected boolean isArray;
+	int position;
 
-	public AggregationFunction(int pos, TypeInformation<?> type) {
+	public AggregationFunction(int pos) {
 		this.position = pos;
-		this.isTuple = type.isTupleType();
-		this.isArray = type instanceof BasicArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo;
+	}
+
+	public static enum AggregationType {
+		SUM, MIN, MAX, MINBY, MAXBY,
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
deleted file mode 100644
index 383c39c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.lang.reflect.Array;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class ComparableAggregationFunction<T> extends AggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public ComparableAggregationFunction(int positionToAggregate, TypeInformation<?> type) {
-		super(positionToAggregate, type);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public T reduce(T value1, T value2) throws Exception {
-		if (isTuple) {
-			Tuple t1 = (Tuple) value1;
-			Tuple t2 = (Tuple) value2;
-
-			compare(t1, t2);
-
-			return (T) returnTuple;
-		} else if (isArray) {
-			return compareArray(value1, value2);
-		} else if (value1 instanceof Comparable) {
-			if (isExtremal((Comparable<Object>) value1, value2)) {
-				return value1;
-			} else {
-				return value2;
-			}
-		} else {
-			throw new RuntimeException("The values " + value1 + " and " + value2
-					+ " cannot be compared.");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public T compareArray(T array1, T array2) {
-		Object v1 = Array.get(array1, position);
-		Object v2 = Array.get(array2, position);
-		if (isExtremal((Comparable<Object>) v1, v2)) {
-			Array.set(array2, position, v1);
-		} else {
-			Array.set(array2, position, v2);
-		}
-
-		return array2;
-	}
-
-	public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
-			IllegalAccessException {
-
-		Comparable<R> o1 = tuple1.getField(position);
-		R o2 = tuple2.getField(position);
-
-		if (isExtremal(o1, o2)) {
-			tuple2.setField(o1, position);
-		}
-		returnTuple = tuple2;
-	}
-
-	public abstract <R> boolean isExtremal(Comparable<R> o1, R o2);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
new file mode 100644
index 0000000..6e2a400
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class ComparableAggregator<T> extends AggregationFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	Comparator comparator;
+	boolean byAggregate;
+	boolean first;
+
+	private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+		super(pos);
+		this.comparator = Comparator.getForAggregation(aggregationType);
+		this.byAggregate = (aggregationType == AggregationType.MAXBY)
+				|| (aggregationType == AggregationType.MINBY);
+		this.first = first;
+	}
+
+	public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
+			TypeInformation<R> typeInfo, AggregationType aggregationType) {
+		return getAggregator(positionToAggregate, typeInfo, aggregationType, false);
+	}
+
+	public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
+			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
+
+		if (typeInfo.isTupleType()) {
+			return new TupleComparableAggregator<R>(positionToAggregate, aggregationType, first);
+		} else if (typeInfo instanceof BasicArrayTypeInfo
+				|| typeInfo instanceof PrimitiveArrayTypeInfo) {
+			return new ArrayComparableAggregator<R>(positionToAggregate, aggregationType, first);
+		} else {
+			return new SimpleComparableAggregator<R>(aggregationType);
+		}
+	}
+
+	public static <R> AggregationFunction<R> getAggregator(String field,
+			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
+
+		return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first);
+	}
+
+	private static class TupleComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public TupleComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+			super(pos, aggregationType, first);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+			Tuple tuple1 = (Tuple) value1;
+			Tuple tuple2 = (Tuple) value2;
+
+			Comparable<Object> o1 = tuple1.getField(position);
+			Object o2 = tuple2.getField(position);
+
+			int c = comparator.isExtremal(o1, o2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return (T) tuple1;
+				}
+				if (first) {
+					if (c == 0) {
+						return (T) tuple1;
+					}
+				}
+
+				return (T) tuple2;
+
+			} else {
+				if (c == 1) {
+					tuple2.setField(o1, position);
+				}
+				return (T) tuple2;
+			}
+
+		}
+	}
+
+	private static class ArrayComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public ArrayComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+			super(pos, aggregationType, first);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T array1, T array2) throws Exception {
+
+			Object v1 = Array.get(array1, position);
+			Object v2 = Array.get(array2, position);
+
+			int c = comparator.isExtremal((Comparable<Object>) v1, v2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return array1;
+				}
+				if (first) {
+					if (c == 0) {
+						return array1;
+					}
+				}
+
+				return array2;
+			} else {
+				if (c == 1) {
+					Array.set(array2, position, v1);
+				} else {
+					Array.set(array2, position, v2);
+				}
+
+				return array2;
+			}
+		}
+
+	}
+
+	private static class SimpleComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public SimpleComparableAggregator(AggregationType aggregationType) {
+			super(0, aggregationType, false);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			if (comparator.isExtremal((Comparable<Object>) value1, value2) == 1) {
+				return value1;
+			} else {
+				return value2;
+			}
+		}
+
+	}
+
+	private static class PojoComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+		PojoComparator<T> pojoComparator;
+
+		public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
+				AggregationType aggregationType, boolean first) {
+			super(0, aggregationType, first);
+			if (!(typeInfo instanceof CompositeType<?>)) {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types and Tuples. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+
+			@SuppressWarnings("unchecked")
+			CompositeType<T> cType = (CompositeType<T>) typeInfo;
+
+			List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
+			cType.getKey(field, 0, fieldDescriptors);
+
+			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
+
+			if (cType instanceof PojoTypeInfo) {
+				pojoComparator = (PojoComparator<T>) cType.createComparator(
+						new int[] { logicalKeyPosition }, new boolean[] { false }, 0);
+			} else {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+		}
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Field[] keyFields = pojoComparator.getKeyFields();
+			Object field1 = pojoComparator.accessField(keyFields[0], value1);
+			Object field2 = pojoComparator.accessField(keyFields[0], value2);
+
+			@SuppressWarnings("unchecked")
+			int c = comparator.isExtremal((Comparable<Object>) field1, field2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return value1;
+				}
+				if (first) {
+					if (c == 0) {
+						return value1;
+					}
+				}
+
+				return value2;
+			} else {
+				if (c == 1) {
+					keyFields[0].set(value2, field1);
+				} else {
+					keyFields[0].set(value2, field2);
+				}
+
+				return value2;
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
new file mode 100644
index 0000000..f56774b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+
+public abstract class Comparator implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract <R> int isExtremal(Comparable<R> o1, R o2);
+
+	public static Comparator getForAggregation(AggregationType type) {
+		switch (type) {
+		case MAX:
+			return new MaxComparator();
+		case MIN:
+			return new MinComparator();
+		case MINBY:
+			return new MinByComparator();
+		case MAXBY:
+			return new MaxByComparator();
+		default:
+			throw new IllegalArgumentException("Unsupported aggregation type.");
+		}
+	}
+
+	private static class MaxComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			return o1.compareTo(o2) > 0 ? 1 : 0;
+		}
+
+	}
+
+	private static class MaxByComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			int c = o1.compareTo(o2);
+			if (c > 0) {
+				return 1;
+			}
+			if (c == 0) {
+				return 0;
+			} else {
+				return -1;
+			}
+		}
+
+	}
+
+	private static class MinByComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			int c = o1.compareTo(o2);
+			if (c < 0) {
+				return 1;
+			}
+			if (c == 0) {
+				return 0;
+			} else {
+				return -1;
+			}
+		}
+
+	}
+
+	private static class MinComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			return o1.compareTo(o2) < 0 ? 1 : 0;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
deleted file mode 100644
index d013162..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class MaxAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public MaxAggregationFunction(int pos, TypeInformation<?> type) {
-		super(pos, type);
-	}
-
-	@Override
-	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
-		return o1.compareTo(o2) > 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
deleted file mode 100644
index 4679028..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class MaxByAggregationFunction<T> extends MinByAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public MaxByAggregationFunction(int pos, boolean first, TypeInformation<?> type) {
-		super(pos, first, type);
-	}
-
-	@Override
-	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
-		if (first) {
-			return o1.compareTo(o2) >= 0;
-		} else {
-			return o1.compareTo(o2) > 0;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
deleted file mode 100644
index 83c20c7..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class MinAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public MinAggregationFunction(int pos, TypeInformation<?> type) {
-		super(pos, type);
-	}
-
-	@Override
-	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
-		return o1.compareTo(o2) < 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
deleted file mode 100644
index 31d6b37..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.lang.reflect.Array;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class MinByAggregationFunction<T> extends ComparableAggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-	protected boolean first;
-
-	public MinByAggregationFunction(int pos, boolean first, TypeInformation<?> type) {
-		super(pos, type);
-		this.first = first;
-	}
-
-	@Override
-	public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
-			IllegalAccessException {
-
-		Comparable<R> o1 = tuple1.getField(position);
-		R o2 = tuple2.getField(position);
-
-		if (isExtremal(o1, o2)) {
-			returnTuple = tuple1;
-		} else {
-			returnTuple = tuple2;
-		}
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public T compareArray(T array1, T array2) {
-		Object v1 = Array.get(array1, position);
-		Object v2 = Array.get(array2, position);
-		if (isExtremal((Comparable<Object>) v1, v2)) {
-			return array1;
-		} else {
-			return array2;
-		}
-	}
-
-	@Override
-	public <R> boolean isExtremal(Comparable<R> o1, R o2) {
-		if (first) {
-			return o1.compareTo(o2) <= 0;
-		} else {
-			return o1.compareTo(o2) < 0;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
deleted file mode 100644
index cd50072..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.aggregation;
-
-import java.lang.reflect.Array;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public SumAggregationFunction(int pos, TypeInformation<?> type) {
-		super(pos, type);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public T reduce(T value1, T value2) throws Exception {
-		if (isTuple) {
-			Tuple tuple1 = (Tuple) value1;
-			Tuple tuple2 = (Tuple) value2;
-
-			returnTuple = tuple2;
-			returnTuple.setField(add(tuple1.getField(position), tuple2.getField(position)),
-					position);
-
-			return (T) returnTuple;
-		} else if (isArray) {
-			Object v1 = Array.get(value1, position);
-			Object v2 = Array.get(value2, position);
-			Array.set(value2, position, add(v1, v2));
-			return value2;
-		} else {
-			return (T) add(value1, value2);
-		}
-	}
-
-	protected abstract Object add(Object value1, Object value2);
-
-	@SuppressWarnings("rawtypes")
-	public static <T> SumAggregationFunction getSumFunction(int pos, Class<T> classAtPos,
-			TypeInformation<?> typeInfo) {
-
-		if (classAtPos == Integer.class) {
-			return new IntSum<T>(pos, typeInfo);
-		} else if (classAtPos == Long.class) {
-			return new LongSum<T>(pos, typeInfo);
-		} else if (classAtPos == Short.class) {
-			return new ShortSum<T>(pos, typeInfo);
-		} else if (classAtPos == Double.class) {
-			return new DoubleSum<T>(pos, typeInfo);
-		} else if (classAtPos == Float.class) {
-			return new FloatSum<T>(pos, typeInfo);
-		} else if (classAtPos == Byte.class) {
-			return new ByteSum<T>(pos, typeInfo);
-		} else {
-			throw new RuntimeException("DataStream cannot be summed because the class "
-					+ classAtPos.getSimpleName() + " does not support the + operator.");
-		}
-
-	}
-
-	private static class IntSum<T> extends SumAggregationFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		public IntSum(int pos, TypeInformation<?> type) {
-			super(pos, type);
-		}
-
-		@Override
-		protected Object add(Object value1, Object value2) {
-			return (Integer) value1 + (Integer) value2;
-		}
-	}
-
-	private static class LongSum<T> extends SumAggregationFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		public LongSum(int pos, TypeInformation<?> type) {
-			super(pos, type);
-		}
-
-		@Override
-		protected Object add(Object value1, Object value2) {
-			return (Long) value1 + (Long) value2;
-		}
-	}
-
-	private static class DoubleSum<T> extends SumAggregationFunction<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		public DoubleSum(int pos, TypeInformation<?> type) {
-			super(pos, type);
-		}
-
-		@Override
-		protected Object add(Object value1, Object value2) {
-			return (Double) value1 + (Double) value2;
-		}
-	}
-
-	private static class ShortSum<T> extends SumAggregationFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		public ShortSum(int pos, TypeInformation<?> type) {
-			super(pos, type);
-		}
-
-		@Override
-		protected Object add(Object value1, Object value2) {
-			return (Short) value1 + (Short) value2;
-		}
-	}
-
-	private static class FloatSum<T> extends SumAggregationFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		public FloatSum(int pos, TypeInformation<?> type) {
-			super(pos, type);
-		}
-
-		@Override
-		protected Object add(Object value1, Object value2) {
-			return (Float) value1 + (Float) value2;
-		}
-	}
-
-	private static class ByteSum<T> extends SumAggregationFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		public ByteSum(int pos, TypeInformation<?> type) {
-			super(pos, type);
-		}
-
-		@Override
-		protected Object add(Object value1, Object value2) {
-			return (Byte) value1 + (Byte) value2;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
new file mode 100644
index 0000000..384b4f6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class SumAggregator {
+
+	public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> clazz,
+			TypeInformation<T> typeInfo) {
+
+		if (typeInfo.isTupleType()) {
+			return new TupleSumAggregator<T>(pos, SumFunction.getForClass(clazz));
+		} else if (typeInfo instanceof BasicArrayTypeInfo
+				|| typeInfo instanceof PrimitiveArrayTypeInfo) {
+			return new ArraySumAggregator<T>(pos, SumFunction.getForClass(clazz));
+		} else {
+			return new SimpleSumAggregator<T>(SumFunction.getForClass(clazz));
+		}
+
+	}
+
+	public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo) {
+
+		return new PojoSumAggregator<T>(field, typeInfo);
+	}
+
+	private static class TupleSumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		SumFunction adder;
+
+		public TupleSumAggregator(int pos, SumFunction adder) {
+			super(pos);
+			this.adder = adder;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Tuple tuple1 = (Tuple) value1;
+			Tuple tuple2 = (Tuple) value2;
+
+			tuple2.setField(adder.add(tuple1.getField(position), tuple2.getField(position)),
+					position);
+
+			return (T) tuple2;
+		}
+
+	}
+
+	private static class ArraySumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		SumFunction adder;
+
+		public ArraySumAggregator(int pos, SumFunction adder) {
+			super(pos);
+			this.adder = adder;
+		}
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Object v1 = Array.get(value1, position);
+			Object v2 = Array.get(value2, position);
+			Array.set(value2, position, adder.add(v1, v2));
+			return value2;
+		}
+
+	}
+
+	private static class SimpleSumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		SumFunction adder;
+
+		public SimpleSumAggregator(SumFunction adder) {
+			super(0);
+			this.adder = adder;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			return (T) adder.add(value1, value2);
+		}
+
+	}
+
+	private static class PojoSumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+		SumFunction adder;
+		PojoComparator<T> comparator;
+
+		public PojoSumAggregator(String field, TypeInformation<?> type) {
+			super(0);
+			if (!(type instanceof CompositeType<?>)) {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types and Tuples. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+
+			@SuppressWarnings("unchecked")
+			CompositeType<T> cType = (CompositeType<T>) type;
+
+			List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
+			cType.getKey(field, 0, fieldDescriptors);
+
+			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
+			Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass();
+
+			adder = SumFunction.getForClass(keyClass);
+
+			if (cType instanceof PojoTypeInfo) {
+				comparator = (PojoComparator<T>) cType.createComparator(
+						new int[] { logicalKeyPosition }, new boolean[] { false }, 0);
+			} else {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+		}
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Field[] keyFields = comparator.getKeyFields();
+			Object field1 = comparator.accessField(keyFields[0], value1);
+			Object field2 = comparator.accessField(keyFields[0], value2);
+
+			keyFields[0].set(value2, adder.add(field1, field2));
+
+			return value2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
new file mode 100644
index 0000000..1ac236d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.aggregation;
+
+import java.io.Serializable;
+
+public abstract class SumFunction implements Serializable{
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract Object add(Object o1, Object o2);
+
+	public static SumFunction getForClass(Class<?> clazz) {
+
+		if (clazz == Integer.class) {
+			return new IntSum();
+		} else if (clazz == Long.class) {
+			return new LongSum();
+		} else if (clazz == Short.class) {
+			return new ShortSum();
+		} else if (clazz == Double.class) {
+			return new DoubleSum();
+		} else if (clazz == Float.class) {
+			return new FloatSum();
+		} else if (clazz == Byte.class) {
+			return new ByteSum();
+		} else {
+			throw new RuntimeException("DataStream cannot be summed because the class "
+					+ clazz.getSimpleName() + " does not support the + operator.");
+		}
+	}
+
+	private static class IntSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Integer) value1 + (Integer) value2;
+		}
+	}
+
+	private static class LongSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Long) value1 + (Long) value2;
+		}
+	}
+
+	private static class DoubleSum extends SumFunction {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Double) value1 + (Double) value2;
+		}
+	}
+
+	private static class ShortSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Short) value1 + (Short) value2;
+		}
+	}
+
+	private static class FloatSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Float) value1 + (Float) value2;
+		}
+	}
+
+	private static class ByteSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Byte) value1 + (Byte) value2;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ae58042/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 95c6f71..0fbf72a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -23,15 +23,14 @@ import static org.junit.Assert.fail;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.util.MockInvokable;
@@ -87,22 +86,22 @@ public class AggregationFunctionTest {
 			expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
 		}
 
-		TypeInformation<?> type1 = TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0));
-		TypeInformation<?> type2 = TypeExtractor.getForObject(2);
-
-		@SuppressWarnings("unchecked")
-		SumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregationFunction
-				.getSumFunction(1, Integer.class, type1);
-		@SuppressWarnings("unchecked")
-		SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction.getSumFunction(0,
-				Integer.class, type2);
-		MinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new MinAggregationFunction<Tuple2<Integer, Integer>>(
-				1, type1);
-		MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>(0, type2);
-		MaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new MaxAggregationFunction<Tuple2<Integer, Integer>>(
-				1, type1);
-		MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>(0, type2);
-
+		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
+				.getForObject(new Tuple2<Integer, Integer>(0, 0));
+		TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
+
+		ReduceFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregator.getSumFunction(1,
+				Integer.class, type1);
+		ReduceFunction<Integer> sumFunction0 = SumAggregator
+				.getSumFunction(0, Integer.class, type2);
+		ReduceFunction<Tuple2<Integer, Integer>> minFunction = ComparableAggregator
+				.getAggregator(1, type1, AggregationType.MIN);
+		ReduceFunction<Integer> minFunction0 = ComparableAggregator.getAggregator(0,
+				type2, AggregationType.MIN);
+		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = ComparableAggregator
+				.getAggregator(1, type1, AggregationType.MAX);
+		ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0,
+				type2, AggregationType.MAX);
 		List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
 
@@ -157,15 +156,15 @@ public class AggregationFunctionTest {
 			// Nothing to do here
 		}
 
-		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, true, type1);
-		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionLast = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, false, type1);
+		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MAXBY, true);
+		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MAXBY, false);
 
-		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionFirst = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, true, type1);
-		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionLast = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, false, type1);
+		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MINBY, true);
+		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MINBY, false);
 
 		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
 		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
@@ -228,17 +227,18 @@ public class AggregationFunctionTest {
 
 	@Test
 	public void minMaxByTest() {
-		TypeInformation<?> type1 = TypeExtractor.getForObject(new Tuple2<Integer, Integer>(0, 0));
-
-		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, true, type1);
-		MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionLast = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, false, type1);
-
-		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionFirst = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, true, type1);
-		MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionLast = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
-				0, false, type1);
+		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
+				.getForObject(new Tuple2<Integer, Integer>(0, 0));
+
+		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MAXBY, true);
+		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MAXBY, false);
+
+		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MINBY, true);
+		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator
+				.getAggregator(0, type1, AggregationType.MINBY, false);
 
 		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
 		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));


[3/4] incubator-flink git commit: [streaming] Pojo wordcount example added to streaming

Posted by mb...@apache.org.
[streaming] Pojo wordcount example added to streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/97d465b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/97d465b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/97d465b4

Branch: refs/heads/master
Commit: 97d465b496dc3f502a989f6df5c3362084c41a12
Parents: 7ae5804
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 5 17:49:34 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Sun Nov 9 13:16:46 2014 +0100

----------------------------------------------------------------------
 .../examples/wordcount/PojoWordCount.java       | 169 +++++++++++++++++++
 .../examples/java/wordcount/PojoExample.java    |   4 +-
 2 files changed, 171 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d465b4/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
new file mode 100644
index 0000000..e95f042
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of Wordcount without using the Tuple2
+ * type, but a custom class.
+ *
+ */
+public class PojoWordCount {
+
+	/**
+	 * This is the POJO (Plain Old Java Object) that is being used for all the
+	 * operations. As long as all fields are public or have a getter/setter, the
+	 * system can handle them
+	 */
+	public static class Word {
+		// fields
+		private String word;
+		private Integer frequency;
+
+		// constructors
+		public Word() {
+		}
+
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+
+		// getters setters
+		public String getWord() {
+			return word;
+		}
+
+		public void setWord(String word) {
+			this.word = word;
+		}
+
+		public Integer getFrequency() {
+			return frequency;
+		}
+
+		public void setFrequency(Integer frequency) {
+			this.frequency = frequency;
+		}
+
+		// to String
+		@Override
+		public String toString() {
+			return "Word=" + word + " freq=" + frequency;
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Word> counts =
+		// split up the lines into Word objects
+		text.flatMap(new Tokenizer())
+		// group by the field word and sum up the frequency
+		.groupBy("word")
+		.sum("frequency");
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount-Pojo Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Word> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Word> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Word(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d465b4/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
index 363c7a3..f3364fd 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
@@ -84,9 +84,9 @@ public class PojoExample {
 		DataSet<String> text = getTextDataSet(env);
 		
 		DataSet<Word> counts = 
-			// split up the lines in pairs (2-tuples) containing: (word,1)
+			// split up the lines into Word objects (with frequency = 1)
 			text.flatMap(new Tokenizer())
-			// group by the tuple field "0" and sum up tuple field "1"
+			// group by the field word and sum up the frequency
 			.groupBy("word")
 			.reduce(new ReduceFunction<Word>() {
 				@Override


[4/4] incubator-flink git commit: [streaming] Updated streaming groupBy operators to allow grouping on field expressions

Posted by mb...@apache.org.
[streaming] Updated streaming groupBy operators to allow grouping on field expressions


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9f3846f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9f3846f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9f3846f

Branch: refs/heads/master
Commit: c9f3846fa7fa673f23583db554a29a5ee309d1d7
Parents: e580497
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Nov 4 23:00:18 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Sun Nov 9 13:16:46 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/BatchedDataStream.java       |  28 ++++-
 .../api/datastream/CoBatchedDataStream.java     |  22 ++++
 .../api/datastream/CoWindowDataStream.java      |  26 +++++
 .../api/datastream/ConnectedDataStream.java     |  71 +++++++++++++
 .../streaming/api/datastream/DataStream.java    | 106 +++++++++++++++++--
 .../api/datastream/WindowDataStream.java        |   9 +-
 .../api/function/co/JoinWindowFunction.java     |  15 ++-
 .../streaming/util/keys/ArrayKeySelector.java   |  45 ++++++++
 .../streaming/util/keys/FieldsKeySelector.java  |  83 ++++-----------
 .../streaming/util/keys/ObjectKeySelector.java  |  30 ++++++
 .../streaming/util/keys/PojoKeySelector.java    |  98 +++++++++++++++++
 .../streaming/util/keys/TupleKeySelector.java   |  44 ++++++++
 .../streaming/api/AggregationFunctionTest.java  |  11 +-
 .../operator/CoGroupedBatchReduceTest.java      |  10 +-
 .../invokable/operator/CoGroupedReduceTest.java |   8 +-
 .../operator/CoGroupedWindowReduceTest.java     |  10 +-
 .../operator/GroupedBatchGroupReduceTest.java   |   9 +-
 .../operator/GroupedBatchReduceTest.java        |   7 +-
 .../operator/GroupedReduceInvokableTest.java    |   4 +-
 .../GroupedWindowGroupReduceInvokableTest.java  |   4 +-
 .../operator/WindowReduceInvokableTest.java     |   5 +-
 .../partitioner/FieldsPartitionerTest.java      |   5 +-
 .../streaming/util/FieldsKeySelectorTest.java   |  41 ++-----
 .../java/typeutils/runtime/PojoComparator.java  |   2 +-
 24 files changed, 543 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 75eadcf..86cb90b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -75,15 +75,33 @@ public class BatchedDataStream<OUT> {
 
 	/**
 	 * Groups the elements of the {@link BatchedDataStream} by the given key
-	 * position to be used with grouped operators.
+	 * positions to be used with grouped operators.
 	 * 
-	 * @param keyPosition
-	 *            The position of the field on which the
+	 * @param fields
+	 *            The position of the fields on which the
 	 *            {@link BatchedDataStream} will be grouped.
 	 * @return The transformed {@link BatchedDataStream}
 	 */
-	public BatchedDataStream<OUT> groupBy(int keyPosition) {
-		return new BatchedDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize);
+	public BatchedDataStream<OUT> groupBy(int... fields) {
+		return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
+	}
+
+	/**
+	 * Groups a {@link BatchedDataStream} using field expressions. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link BatchedDataStream}S underlying type. A dot can
+	 * be used to drill down into objects, as in
+	 * {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param fields
+	 *            One or more field expressions on which the DataStream will be
+	 *            grouped.
+	 * @return The grouped {@link BatchedDataStream}
+	 **/
+	public BatchedDataStream<OUT> groupBy(String... fields) {
+
+		return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
index bfe679d..387c356 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
@@ -83,6 +84,27 @@ public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2>
 				dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2);
 	}
 
+	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
+				dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2);
+	}
+
+	public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
+		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(field1),
+				dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2);
+	}
+
+	public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
+				dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2);
+	}
+
+	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
+			KeySelector<IN2, ?> keySelector2) {
+		return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1),
+				dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2);
+	}
+
 	@Override
 	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
index 95165df..6e47873 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -65,6 +66,31 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2>
 				timeStamp1, timeStamp2);
 	}
 
+	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
+				dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2,
+				timeStamp1, timeStamp2);
+	}
+
+	public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
+		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(field1),
+				dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2,
+				timeStamp1, timeStamp2);
+	}
+
+	public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
+				dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2,
+				timeStamp1, timeStamp2);
+	}
+
+	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
+			KeySelector<IN2, ?> keySelector2) {
+		return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1),
+				dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2,
+				timeStamp1, timeStamp2);
+	}
+
 	@Override
 	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 33dbc7e..23c420c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -154,6 +154,60 @@ public class ConnectedDataStream<IN1, IN2> {
 
 	/**
 	 * GroupBy operation for connected data stream. Groups the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2. Used for
+	 * applying function on grouped data streams for example
+	 * {@link ConnectedDataStream#reduce}
+	 * 
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return @return The transformed {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
+				dataStream2.groupBy(keyPositions2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream using key expressions. Groups
+	 * the elements of input1 and input2 according to field1 and field2. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field1
+	 *            The grouping expression for the first input
+	 * @param field2
+	 *            The grouping expression for the second input
+	 * @return The grouped {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(field1),
+				dataStream2.groupBy(field2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream using key expressions. Groups
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}S underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 * .
+	 * 
+	 * @param fields1
+	 *            The grouping expressions for the first input
+	 * @param fields2
+	 *            The grouping expressions for the second input
+	 * @return The grouped {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
+				dataStream2.groupBy(fields2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream. Groups the elements of
 	 * input1 and input2 using keySelector1 and keySelector2. Used for applying
 	 * function on grouped data streams for example
 	 * {@link ConnectedDataStream#reduce}
@@ -510,6 +564,13 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+			String fieldIn1, String fieldIn2) {
+
+		return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
+				new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	}
+
+	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
 			int fieldIn1, int fieldIn2) {
 
 		return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
@@ -526,6 +587,16 @@ public class ConnectedDataStream<IN1, IN2> {
 				timestamp2);
 	}
 
+	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+
+		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
+				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
+
+		return addGeneralWindowJoin(joinWindowFunction, windowSize, slideInterval, timestamp1,
+				timestamp2);
+	}
+
 	private SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
 			CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
 			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c777003..b50c42d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -69,6 +69,7 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.PojoKeySelector;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeWrapper;
@@ -373,6 +374,30 @@ public class DataStream<OUT> {
 	 * @param slideInterval
 	 *            After every function call the windows will be slid by this
 	 *            interval.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, String fieldIn1,
+			String fieldIn2) {
+		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	}
+
+	/**
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
 	 * @param timestamp1
 	 *            User defined time stamps for the first input.
 	 * @param timestamp2
@@ -391,8 +416,36 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are partitioned by the hashcodes of the selected fields.
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param timestamp1
+	 *            User defined time stamps for the first input.
+	 * @param timestamp2
+	 *            User defined time stamps for the second input.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+				timestamp2, fieldIn1, fieldIn2);
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned by the selected fields.
 	 * 
 	 * @param fields
 	 *            The fields to partition by.
@@ -400,10 +453,31 @@ public class DataStream<OUT> {
 	 */
 	public DataStream<OUT> partitionBy(int... fields) {
 
-		return setConnectionType(new FieldsPartitioner<OUT>(new FieldsKeySelector<OUT>(
+		return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
 				getOutputType(), fields)));
 	}
 
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned by the given field expressions.
+	 * 
+	 * @param fields
+	 *            The fields expressions to partition by.
+	 * @return The DataStream with fields partitioning set.
+	 */
+	public DataStream<OUT> partitionBy(String... fields) {
+
+		return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(
+				getOutputType(), fields)));
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned using the given {@link KeySelector}.
+	 * 
+	 * @param keySelector
+	 * @return
+	 */
 	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(keySelector));
 	}
@@ -547,10 +621,30 @@ public class DataStream<OUT> {
 	 * @param fields
 	 *            The position of the fields on which the {@link DataStream}
 	 *            will be grouped.
-	 * @return The transformed {@link DataStream}
+	 * @return The grouped {@link DataStream}
 	 */
 	public GroupedDataStream<OUT> groupBy(int... fields) {
-		return groupBy(new FieldsKeySelector<OUT>(getOutputType(), fields));
+
+		return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
+
+	}
+
+	/**
+	 * Groups a {@link DataStream} using field expressions. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream}S underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }. This method
+	 * returns an {@link GroupedDataStream}.
+	 * 
+	 * @param fields
+	 *            One or more field expressions on which the DataStream will be
+	 *            grouped.
+	 * @return The grouped {@link DataStream}
+	 **/
+	public GroupedDataStream<OUT> groupBy(String... fields) {
+
+		return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
+
 	}
 
 	/**
@@ -561,7 +655,7 @@ public class DataStream<OUT> {
 	 * @param keySelector
 	 *            The {@link KeySelector} that will be used to extract keys for
 	 *            the values
-	 * @return The transformed {@link DataStream}
+	 * @return The grouped {@link DataStream}
 	 */
 	public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
 		return new GroupedDataStream<OUT>(this, keySelector);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
index d523b9a..be0e37f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
@@ -51,8 +51,13 @@ public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
 		this.timeStamp = windowDataStream.timeStamp;
 	}
 
-	public WindowDataStream<OUT> groupBy(int keyPosition) {
-		return new WindowDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize,
+	public WindowDataStream<OUT> groupBy(int... keyPositions) {
+		return new WindowDataStream<OUT>(dataStream.groupBy(keyPositions), batchSize, slideSize,
+				timeStamp);
+	}
+
+	public WindowDataStream<OUT> groupBy(String... fields) {
+		return new WindowDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize,
 				timeStamp);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
index 3dea04d..cb501c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -24,21 +24,28 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.PojoKeySelector;
 import org.apache.flink.util.Collector;
 
 public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
 	private static final long serialVersionUID = 1L;
 
-	private KeySelector<IN1, Object> keySelector1;
-	private KeySelector<IN2, Object> keySelector2;
+	private KeySelector<IN1, ?> keySelector1;
+	private KeySelector<IN2, ?> keySelector2;
 
 	public JoinWindowFunction() {
 	}
 
 	public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
 			int positionIn1, int positionIn2) {
-		keySelector1 = new FieldsKeySelector<IN1>(inType1, positionIn1);
-		keySelector2 = new FieldsKeySelector<IN2>(inType2, positionIn2);
+		keySelector1 = FieldsKeySelector.getSelector(inType1, positionIn1);
+		keySelector2 = FieldsKeySelector.getSelector(inType2, positionIn2);
+	}
+
+	public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
+			String field1, String field2) {
+		keySelector1 = new PojoKeySelector<IN1>(inType1, field1);
+		keySelector2 = new PojoKeySelector<IN2>(inType2, field2);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
new file mode 100644
index 0000000..6d6f620
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import java.lang.reflect.Array;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class ArrayKeySelector<IN> extends FieldsKeySelector<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	public ArrayKeySelector(int... fields) {
+		super(fields);
+	}
+
+	@Override
+	public Object getKey(IN value) throws Exception {
+		if (simpleKey) {
+			return Array.get(value, keyFields[0]);
+		} else {
+			int c = 0;
+			for (int pos : keyFields) {
+				((Tuple) key).setField(Array.get(value, pos), c);
+				c++;
+			}
+			return key;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
index 664d545..d785109 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.util.keys;
 
-import java.lang.reflect.Array;
-
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,15 +48,13 @@ import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.tuple.Tuple9;
 
-public class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
+public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
 
 	private static final long serialVersionUID = 1L;
 
-	int[] keyFields;
-	boolean isTuple;
-	boolean isArray;
-	int numberOfKeys;
-	Object key;
+	protected int[] keyFields;
+	protected Object key;
+	protected boolean simpleKey;
 
 	public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class,
 			Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
@@ -67,73 +63,40 @@ public class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
 			Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class,
 			Tuple25.class };
 
-	public FieldsKeySelector(boolean isTuple, boolean isArray, int... fields) {
+	public FieldsKeySelector(int... fields) {
 		this.keyFields = fields;
-		this.numberOfKeys = fields.length;
-		this.isTuple = isTuple;
-		this.isArray = isArray;
-
+		this.simpleKey = fields.length == 1;
 		for (int i : fields) {
 			if (i < 0) {
 				throw new RuntimeException("Grouping fields must be non-negative");
 			}
 		}
 
-		if (numberOfKeys > 1) {
-			if (!this.isTuple && !this.isArray) {
-				throw new RuntimeException(
-						"For non-tuple types use single field 0 or KeyExctractor for grouping");
-			} else {
-				try {
-					key = tupleClasses[fields.length - 1].newInstance();
-				} catch (Exception e) {
-					throw new RuntimeException(e.getMessage());
-				}
-			}
-		} else {
-			if (!this.isTuple && !this.isArray) {
-				if (fields[0] > 0) {
-					throw new RuntimeException(
-							"For simple objects grouping only allowed on the first field");
-				}
-			}
-			key = null;
+		try {
+			key = (Tuple) tupleClasses[fields.length - 1].newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException(e.getMessage());
 		}
-	}
 
-	public FieldsKeySelector(TypeInformation<IN> type, int... fields) {
-		this(type.isTupleType(),
-				(type instanceof BasicArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo),
-				fields);
 	}
 
-	@Override
-	public Object getKey(IN value) throws Exception {
-		if (numberOfKeys > 1) {
-			int c = 0;
-			if (isTuple) {
-				for (int pos : keyFields) {
-					((Tuple) key).setField(((Tuple) value).getField(pos), c);
-					c++;
-				}
-			} else {
-				// if array type
-				for (int pos : keyFields) {
-					((Tuple) key).setField(Array.get(value, pos), c);
-					c++;
-				}
-
-			}
+	public static <R> KeySelector<R, ?> getSelector(TypeInformation<R> type, int... fields) {
+		if (type.isTupleType()) {
+			return new TupleKeySelector<R>(fields);
+		} else if (type instanceof BasicArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo) {
+			return new ArrayKeySelector<R>(fields);
 		} else {
-			if (isTuple) {
-				key = ((Tuple) value).getField(keyFields[0]);
-			} else if (isArray) {
-				key = Array.get(value, keyFields[0]);
+			if (fields.length > 1) {
+				throw new RuntimeException(
+						"For non-tuple types use single field 0 or KeyExctractor for grouping");
+
+			} else if (fields[0] > 0) {
+				throw new RuntimeException(
+						"For simple objects grouping only allowed on the first field");
 			} else {
-				key = value;
+				return new ObjectKeySelector<R>();
 			}
 		}
-		return key;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
new file mode 100644
index 0000000..cf4ecc6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class ObjectKeySelector<IN> implements KeySelector<IN, IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public IN getKey(IN value) throws Exception {
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
new file mode 100644
index 0000000..618f14f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public class PojoKeySelector<IN> extends FieldsKeySelector<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	PojoComparator<IN> comparator;
+
+	public PojoKeySelector(TypeInformation<IN> type, String... fields) {
+		super(new int[removeDuplicates(fields).length]);
+		if (!(type instanceof CompositeType<?>)) {
+			throw new IllegalArgumentException(
+					"Key expressions are only supported on POJO types and Tuples. "
+							+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+		}
+		CompositeType<IN> cType = (CompositeType<IN>) type;
+
+		String[] keyFields = removeDuplicates(fields);
+		int numOfKeys = keyFields.length;
+
+		List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
+		for (String field : keyFields) {
+			cType.getKey(field, 0, fieldDescriptors);
+		}
+
+		int[] logicalKeyPositions = new int[numOfKeys];
+		boolean[] orders = new boolean[numOfKeys];
+
+		for (int i = 0; i < numOfKeys; i++) {
+			logicalKeyPositions[i] = fieldDescriptors.get(i).getPosition();
+		}
+
+		if (cType instanceof PojoTypeInfo) {
+			comparator = (PojoComparator<IN>) cType
+					.createComparator(logicalKeyPositions, orders, 0);
+		} else {
+			throw new IllegalArgumentException(
+					"Key expressions are only supported on POJO types. "
+							+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+		}
+
+	}
+
+	@Override
+	public Object getKey(IN value) throws Exception {
+
+		Field[] keyFields = comparator.getKeyFields();
+		if (simpleKey) {
+			return comparator.accessField(keyFields[0], value);
+		} else {
+			int c = 0;
+			for (Field field : keyFields) {
+				((Tuple) key).setField(comparator.accessField(field, value), c);
+				c++;
+			}
+		}
+		return key;
+	}
+
+	private static String[] removeDuplicates(String[] in) {
+		List<String> ret = new LinkedList<String>();
+		for (String el : in) {
+			if (!ret.contains(el)) {
+				ret.add(el);
+			}
+		}
+		return ret.toArray(new String[ret.size()]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
new file mode 100644
index 0000000..4ca64ef
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class TupleKeySelector<IN> extends FieldsKeySelector<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	public TupleKeySelector(int... fields) {
+		super(fields);
+	}
+
+	@Override
+	public Object getKey(IN value) throws Exception {
+		if (simpleKey) {
+			return ((Tuple) value).getField(keyFields[0]);
+		} else {
+			int c = 0;
+			for (int pos : keyFields) {
+				((Tuple) key).setField(((Tuple) value).getField(pos), c);
+				c++;
+			}
+			return key;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 70e6118..95c6f71 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunctio
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class AggregationFunctionTest {
@@ -114,18 +114,15 @@ public class AggregationFunctionTest {
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
 				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction,
-						new FieldsKeySelector<Tuple2<Integer, Integer>>(true, false, 0)),
-				getInputList());
+						new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
 				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction,
-						new FieldsKeySelector<Tuple2<Integer, Integer>>(true, false, 0)),
-				getInputList());
+						new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
 				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction,
-						new FieldsKeySelector<Tuple2<Integer, Integer>>(true, false, 0)),
-				getInputList());
+						new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
 		assertEquals(expectedSumList, sumList);
 		assertEquals(expectedMinList, minList);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index 229053b..ce01a7d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
 import org.apache.flink.streaming.util.MockCoInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class CoGroupedBatchReduceTest {
@@ -96,8 +96,8 @@ public class CoGroupedBatchReduceTest {
 		expected.add("h");
 
 		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new FieldsKeySelector(true, false, 0),
-				new FieldsKeySelector(true, false, 0));
+				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0),
+				new TupleKeySelector(0));
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
 
@@ -143,8 +143,8 @@ public class CoGroupedBatchReduceTest {
 		expected.add("fh");
 
 		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new FieldsKeySelector(true, false, 0),
-				new FieldsKeySelector(true, false, 0));
+				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector(0),
+				new TupleKeySelector(0));
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
index e3557c5..4570e23 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
 import org.apache.flink.streaming.util.MockCoInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class CoGroupedReduceTest {
@@ -72,8 +72,7 @@ public class CoGroupedReduceTest {
 		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
 
 		CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), new FieldsKeySelector(true, false, 0),
-				new FieldsKeySelector(true, false, 0));
+				new MyCoReduceFunction(), new TupleKeySelector(0), new TupleKeySelector(0));
 
 		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
 				"7");
@@ -84,8 +83,7 @@ public class CoGroupedReduceTest {
 		assertEquals(expected, actualList);
 
 		invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), new FieldsKeySelector(true, false, 2),
-				new FieldsKeySelector(true, false, 0));
+				new MyCoReduceFunction(), new TupleKeySelector(2), new TupleKeySelector(0));
 
 		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index 1469b32..f36a7b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.MockCoInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class CoGroupedWindowReduceTest {
@@ -125,8 +125,8 @@ public class CoGroupedWindowReduceTest {
 		expected.add("i");
 
 		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new FieldsKeySelector(true, false, 0),
-				new FieldsKeySelector(true, false, 0), new MyTimeStamp<Tuple2<String, Integer>>(
+				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0),
+				new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
 						timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
@@ -178,8 +178,8 @@ public class CoGroupedWindowReduceTest {
 		expected.add("fh");
 
 		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
-				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new FieldsKeySelector(true, false, 0),
-				new FieldsKeySelector(true, false, 0), new MyTimeStamp<Tuple2<String, Integer>>(
+				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector( 0),
+				new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
 						timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
index 4c70a5c..db2b8cf 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -72,8 +73,9 @@ public class GroupedBatchGroupReduceTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void slidingBatchGroupReduceTest() {
+		@SuppressWarnings("rawtypes")
 		GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>(
-				new MySlidingBatchReduce1(), 2, 2, new FieldsKeySelector<Integer>(false, false, 0));
+				new MySlidingBatchReduce1(), 2, 2, new ObjectKeySelector());
 
 		List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "3", "3", END_OF_GROUP, "2",
 				END_OF_GROUP);
@@ -83,8 +85,7 @@ public class GroupedBatchGroupReduceTest {
 		assertEquals(expected, actual);
 
 		GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
-				new MySlidingBatchReduce2(), 2, 2, new FieldsKeySelector<Tuple2<Integer, String>>(
-						true, false, 1));
+				new MySlidingBatchReduce2(), 2, 2, new TupleKeySelector<Tuple2<Integer, String>>(1));
 
 		expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", "3", END_OF_GROUP,
 				"open", "4", END_OF_GROUP);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
index 4b1a235..783119c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
@@ -26,7 +26,8 @@ import java.util.List;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class GroupedBatchReduceTest {
@@ -60,7 +61,7 @@ public class GroupedBatchReduceTest {
 					public Integer reduce(Integer value1, Integer value2) throws Exception {
 						return value1 + value2;
 					}
-				}, 3, 2, new FieldsKeySelector<Integer>(false, false, 0));
+				}, 3, 2, new ObjectKeySelector<Integer>());
 
 		List<Integer> actual = MockInvokable.createAndExecute(invokable, inputs);
 		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
@@ -94,7 +95,7 @@ public class GroupedBatchReduceTest {
 							return value2;
 						}
 					}
-				}, 3, 3, new FieldsKeySelector<Tuple2<Integer, String>>(true, false, 1));
+				}, 3, 3, new TupleKeySelector<Tuple2<Integer, String>>(1));
 
 		List<Tuple2<Integer, String>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
index 6aadc43..0b68207 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
 import org.junit.Test;
 
 public class GroupedReduceInvokableTest {
@@ -43,7 +43,7 @@ public class GroupedReduceInvokableTest {
 	@Test
 	public void test() {
 		GroupedReduceInvokable<Integer> invokable1 = new GroupedReduceInvokable<Integer>(
-				new MyReducer(), new FieldsKeySelector<Integer>(false, false, 0));
+				new MyReducer(), new ObjectKeySelector<Integer>());
 
 		List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
 		List<Integer> actual = MockInvokable.createAndExecute(invokable1,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
index 6054d3b..e93647f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -73,7 +73,7 @@ public class GroupedWindowGroupReduceInvokableTest {
 						}
 						out.collect(outTuple);
 					}
-				}, 2, 3, new FieldsKeySelector(true, false, 0),
+				}, 2, 3, new TupleKeySelector( 0),
 				new TimeStamp<Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
index ed1ed8c..5d10eff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class WindowReduceInvokableTest {
@@ -106,8 +106,7 @@ public class WindowReduceInvokableTest {
 							Tuple2<String, Integer> value2) throws Exception {
 						return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
 					}
-				}, 2, 3, new FieldsKeySelector(true, false, 0),
-				new TimeStamp<Tuple2<String, Integer>>() {
+				}, 2, 3, new TupleKeySelector(0), new TimeStamp<Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
index 347debc..18b3015 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,8 +42,7 @@ public class FieldsPartitionerTest {
 
 	@Before
 	public void setPartitioner() {
-		fieldsPartitioner = new FieldsPartitioner<Tuple>(new FieldsKeySelector<Tuple>(true, false,
-				0));
+		fieldsPartitioner = new FieldsPartitioner<Tuple>(new TupleKeySelector<Tuple>(0));
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
index 8987921..98b60b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
@@ -18,17 +18,17 @@
 package org.apache.flink.streaming.util;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
 public class FieldsKeySelectorTest {
 
-	@SuppressWarnings("unused")
 	@Test
 	public void testGetKey() throws Exception {
 
@@ -36,48 +36,23 @@ public class FieldsKeySelectorTest {
 		Tuple2<Integer, String> t = new Tuple2<Integer, String>(-1, "a");
 		double[] a = new double[] { 0.0, 1.2 };
 
-		KeySelector<Integer, ?> ks1 = new FieldsKeySelector<Integer>(TypeExtractor.getForObject(i),
-				0);
-
-		try {
-			KeySelector<Integer, ?> ks2 = new FieldsKeySelector<Integer>(
-					TypeExtractor.getForObject(i), 2, 1);
-			fail();
-		} catch (RuntimeException e) {
-
-		}
-
-		try {
-			KeySelector<Integer, ?> ks2 = new FieldsKeySelector<Integer>(
-					TypeExtractor.getForObject(i), -1);
-			fail();
-		} catch (RuntimeException e) {
-
-		}
+		KeySelector<Integer, ?> ks1 = new ObjectKeySelector<Integer>();
 
 		assertEquals(ks1.getKey(i), 5);
 
-		KeySelector<Tuple2<Integer, String>, ?> ks3 = new FieldsKeySelector<Tuple2<Integer, String>>(
-				TypeExtractor.getForObject(t), 1);
+		KeySelector<Tuple2<Integer, String>, ?> ks3 = new TupleKeySelector<Tuple2<Integer, String>>(
+				1);
 		assertEquals(ks3.getKey(t), "a");
 
-		try {
-			KeySelector<Tuple2<Integer, String>, ?> ks2 = new FieldsKeySelector<Tuple2<Integer, String>>(
-					TypeExtractor.getForObject(t), 1, -1);
-			fail();
-		} catch (RuntimeException e) {
-
-		}
-
-		KeySelector<Tuple2<Integer, String>, ?> ks4 = new FieldsKeySelector<Tuple2<Integer, String>>(
+		KeySelector<Tuple2<Integer, String>, ?> ks4 = FieldsKeySelector.getSelector(
 				TypeExtractor.getForObject(t), 1, 1);
 		assertEquals(ks4.getKey(t), new Tuple2<String, String>("a", "a"));
 
-		KeySelector<double[], ?> ks5 = new FieldsKeySelector<double[]>(
+		KeySelector<double[], ?> ks5 = FieldsKeySelector.getSelector(
 				TypeExtractor.getForObject(a), 0);
 		assertEquals(ks5.getKey(a), 0.0);
 
-		KeySelector<double[], ?> ks6 = new FieldsKeySelector<double[]>(
+		KeySelector<double[], ?> ks6 = FieldsKeySelector.getSelector(
 				TypeExtractor.getForObject(a), 1, 0);
 		assertEquals(ks6.getKey(a), new Tuple2<Double, Double>(1.2, 0.0));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 7c15ecd..ae4a806 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -192,7 +192,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 	/**
 	 * This method is handling the IllegalAccess exceptions of Field.get()
 	 */
-	private final Object accessField(Field field, Object object) {
+	public final Object accessField(Field field, Object object) {
 		try {
 			object = field.get(object);
 		} catch (NullPointerException npex) {