You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:44 UTC

[10/13] flink git commit: [FLINK-2550] Remove groupBy and GroupedDataStream

[FLINK-2550] Remove groupBy and GroupedDataStream

Their functionality is subsumed by keyBy and KeyedStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9baadfe8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9baadfe8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9baadfe8

Branch: refs/heads/master
Commit: 9baadfe8464976b039e3f5859b910c4ad2e29ac5
Parents: 23d8e26
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 1 17:56:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |   6 +-
 .../api/FlinkTopologyBuilder.java               |   4 +-
 .../wordcount/BoltTokenizerWordCount.java       |   2 +-
 .../wordcount/BoltTokenizerWordCountPojo.java   |   2 +-
 .../BoltTokenizerWordCountWithNames.java        |   2 +-
 .../wordcount/SpoutSourceWordCount.java         |   2 +-
 .../examples/java8/wordcount/WordCount.java     |   2 +-
 .../main/java/SocketTextStreamWordCount.java    |   2 +-
 .../main/scala/SocketTextStreamWordCount.scala  |   2 +-
 .../connectors/twitter/TwitterTopology.java     |   2 +-
 .../api/datastream/ConnectedStreams.java        |  40 +--
 .../streaming/api/datastream/DataStream.java    |  62 ----
 .../api/datastream/DiscretizedStream.java       |  20 +-
 .../api/datastream/GroupedDataStream.java       | 329 -------------------
 .../api/datastream/IterativeDataStream.java     |  19 +-
 .../streaming/api/datastream/KeyedStream.java   | 297 ++++++++++++++++-
 .../api/datastream/WindowedDataStream.java      |  62 ++--
 .../datastream/temporal/StreamJoinOperator.java |   8 +-
 .../flink/streaming/api/CoStreamTest.java       |   4 +-
 .../flink/streaming/api/DataStreamTest.java     |  72 ++--
 .../apache/flink/streaming/api/IterateTest.java |   6 +-
 .../streaming/api/StreamingOperatorsITCase.java |   4 +-
 .../api/complex/ComplexIntegrationTest.java     |   4 +-
 .../api/operators/co/SelfConnectionTest.java    |   4 +-
 .../windowing/ParallelMergeITCase.java          |   2 +-
 .../operators/windowing/WindowingITCase.java    |  14 +-
 .../api/outputformat/CsvOutputFormatITCase.java |   2 +-
 .../outputformat/SocketOutputFormatITCase.java  |   2 +-
 .../outputformat/TextOutputFormatITCase.java    |   2 +-
 .../socket/SocketTextStreamWordCount.java       |   2 +-
 .../examples/twitter/TwitterStream.java         |   2 +-
 .../examples/windowing/SessionWindowing.java    |   2 +-
 .../examples/windowing/TopSpeedWindowing.java   |   2 +-
 .../examples/windowing/WindowWordCount.java     |   2 +-
 .../examples/wordcount/PojoExample.java         |   2 +-
 .../streaming/examples/wordcount/WordCount.java |   2 +-
 .../socket/SocketTextStreamWordCount.scala      |   2 +-
 .../examples/windowing/TopSpeedWindowing.scala  |   2 +-
 .../streaming/api/scala/ConnectedStreams.scala  |  20 +-
 .../flink/streaming/api/scala/DataStream.scala  |  37 +--
 .../streaming/api/scala/GroupedDataStream.scala | 196 -----------
 .../flink/streaming/api/scala/KeyedStream.scala | 195 +++++++++++
 .../api/scala/StreamJoinOperator.scala          |   4 +-
 .../api/scala/WindowedDataStream.scala          |  10 +-
 .../flink/streaming/api/scala/package.scala     |  10 +-
 .../streaming/api/scala/DataStreamTest.scala    |  26 +-
 .../api/scala/OutputFormatTestPrograms.scala    |   6 +-
 .../api/scala/StreamingOperatorsITCase.scala    |   2 +-
 .../CoStreamCheckpointingITCase.java            |   2 +-
 .../PartitionedStateCheckpointingITCase.java    |   2 +-
 .../StreamCheckpointNotifierITCase.java         |   2 +-
 .../StreamCheckpointingITCase.java              |   2 +-
 .../test/classloading/jar/StreamingProgram.java |   2 +-
 53 files changed, 690 insertions(+), 823 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index c437114..3f5a98f 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -627,7 +627,7 @@ dataStream.filter{ _ != 0 }
 	<br/>
 	A map that produces a rolling average per key:</p>
 {% highlight scala %}
-dataStream.keyBy(..).mapWithState((in, state: Option[(Long, Int)]) => state match {
+dataStream.groupBy(..).mapWithState((in, state: Option[(Long, Int)]) => state match {
 	case Some((sum, count)) => ((sum + in)/(count + 1), Some((sum + in, count + 1)))
 	case None => (in, Some((in, 1)))
 })
@@ -713,7 +713,7 @@ dataStream.union(otherStream1, otherStream2, …)
 
 ### Grouped operators
 
-Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`. 
+Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`.
 Keys can be of three types: field positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances.
 
 Aggregation or reduce operators called on `GroupedDataStream`s produce elements on a per group basis.
@@ -1313,7 +1313,7 @@ Checkpointing of the states needs to be enabled from the `StreamExecutionEnviron
 
 Operator states can be accessed from the `RuntimeContext` using the `getOperatorState(“name”, defaultValue, partitioned)` method so it is only accessible in `RichFunction`s. A recommended usage pattern is to retrieve the operator state in the `open(…)` method of the operator and set it as a field in the operator instance for runtime usage. Multiple `OperatorState`s can be used simultaneously by the same operator by using different names to identify them.
 
-Partitioned operator state works only on `KeyedDataStreams`. A `KeyedDataStream` can be created from `DataStream` using the `keyBy` or `groupBy` methods. The `keyBy` method simply takes a `KeySelector` to derive the keys by which the operator state will be partitioned, however, it does not affect the actual partitioning of the `DataStream` records. If data partitioning is also desired then the `groupBy`  method should be used instead to create a `GroupedDataStream` which is a subtype of `KeyedDataStream`. Mind that `KeyedDataStreams` do not support repartitioning (e.g. `shuffle(), forward(), groupBy(...)`).
+Partitioned operator state works only on `KeyedDataStreams`. A `KeyedDataStream` can be created from `DataStream` using the `groupBy` or `groupBy` methods. The `groupBy` method simply takes a `KeySelector` to derive the keys by which the operator state will be partitioned, however, it does not affect the actual partitioning of the `DataStream` records. If data partitioning is also desired then the `groupBy`  method should be used instead to create a `GroupedDataStream` which is a subtype of `KeyedDataStream`. Mind that `KeyedDataStreams` do not support repartitioning (e.g. `shuffle(), forward(), groupBy(...)`).
 
 By default operator states are checkpointed using default java serialization thus they need to be `Serializable`. The user can gain more control over the state checkpoint mechanism by passing a `StateCheckpointer` instance when retrieving the `OperatorState` from the `RuntimeContext`. The `StateCheckpointer` allows custom implementations for the checkpointing logic for increased efficiency and to store arbitrary non-serializable states.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e4d880f..e2d819c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -195,12 +195,12 @@ public class FlinkTopologyBuilder {
 								if (fields.size() > 0) {
 									FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
 									if (producer.size() == 1) {
-										inputStream = inputStream.groupBy(prodDeclarer
+										inputStream = inputStream.keyBy(prodDeclarer
 												.getGroupingFieldIndexes(inputStreamId,
 														grouping.get_fields()));
 									} else {
 										inputStream = inputStream
-												.groupBy(new SplitStreamTypeKeySelector(
+												.keyBy(new SplitStreamTypeKeySelector(
 														prodDeclarer.getGroupingFieldIndexes(
 																inputStreamId,
 																grouping.get_fields())));

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index eab58f5..6f7b6fb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -69,7 +69,7 @@ public class BoltTokenizerWordCount {
 						new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
index 20e69db..300f5bc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
@@ -77,7 +77,7 @@ public class BoltTokenizerWordCountPojo {
 								new StormBoltTokenizerByName()))
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
index e233da1..ed01181 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
@@ -80,7 +80,7 @@ public class BoltTokenizerWordCountWithNames {
 								new StormBoltTokenizerByName(), new Fields("sentence")))
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index cbd054b..21d7811 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -70,7 +70,7 @@ public class SpoutSourceWordCount {
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())
 				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
index c1d6042..41a674f 100644
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
@@ -73,7 +73,7 @@ public class WordCount {
 					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
 				})
 				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
+				.keyBy(0)
 				.sum(1);
 
 		// emit result

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
index 3888d7b..10d8044 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
@@ -76,7 +76,7 @@ public class SocketTextStreamWordCount {
 		// split up the lines in pairs (2-tuples) containing: (word,1)
 		text.flatMap(new LineSplitter())
 		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
+				.keyBy(0)
 				.sum(1);
 
 		counts.print();

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
index 63d840d..9bc85ea 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
@@ -59,7 +59,7 @@ object SocketTextStreamWordCount {
     val text = env.socketTextStream(hostName, port)
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .groupBy(0)
+      .keyBy(0)
       .sum(1)
 
     counts print

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
index d5e8c41..b1fc92c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -82,7 +82,7 @@ public class TwitterTopology {
 						return new Tuple2<String, Integer>(value, 1);
 					}
 				})
-				.groupBy(0)
+				.keyBy(0)
 				.sum(1);
 
 		dataStream.print();

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 2447c1e..4074a1d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -96,7 +96,7 @@ public class ConnectedStreams<IN1, IN2> {
 	}
 
 	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
 	 * input1 and input2 according to keyPosition1 and keyPosition2.
 	 *
 	 * @param keyPosition1
@@ -107,13 +107,13 @@ public class ConnectedStreams<IN1, IN2> {
 	 *            second input stream.
 	 * @return The grouped {@link ConnectedStreams}
 	 */
-	public ConnectedStreams<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
-		return new ConnectedStreams<>(this.environment, inputStream1.groupBy(keyPosition1),
-				inputStream2.groupBy(keyPosition2));
+	public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
+		return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
+				inputStream2.keyBy(keyPosition2));
 	}
 
 	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
 	 * input1 and input2 according to keyPositions1 and keyPositions2.
 	 *
 	 * @param keyPositions1
@@ -122,13 +122,13 @@ public class ConnectedStreams<IN1, IN2> {
 	 *            The fields used to group the second input stream.
 	 * @return The grouped {@link ConnectedStreams}
 	 */
-	public ConnectedStreams<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedStreams<>(environment, inputStream1.groupBy(keyPositions1),
-				inputStream2.groupBy(keyPositions2));
+	public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
+				inputStream2.keyBy(keyPositions2));
 	}
 
 	/**
-	 * GroupBy operation for connected data stream using key expressions. Groups
+	 * KeyBy operation for connected data stream using key expressions. Assigns keys to
 	 * the elements of input1 and input2 according to field1 and field2. A field
 	 * expression is either the name of a public field or a getter method with
 	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
@@ -140,13 +140,13 @@ public class ConnectedStreams<IN1, IN2> {
 	 *            The grouping expression for the second input
 	 * @return The grouped {@link ConnectedStreams}
 	 */
-	public ConnectedStreams<IN1, IN2> groupBy(String field1, String field2) {
-		return new ConnectedStreams<>(environment, inputStream1.groupBy(field1),
-				inputStream2.groupBy(field2));
+	public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
+				inputStream2.keyBy(field2));
 	}
 
 	/**
-	 * GroupBy operation for connected data stream using key expressions. Groups
+	 * KeyBy operation for connected data stream using key expressions.
 	 * the elements of input1 and input2 according to fields1 and fields2. A
 	 * field expression is either the name of a public field or a getter method
 	 * with parentheses of the {@link DataStream}S underlying type. A dot can be
@@ -159,13 +159,13 @@ public class ConnectedStreams<IN1, IN2> {
 	 *            The grouping expressions for the second input
 	 * @return The grouped {@link ConnectedStreams}
 	 */
-	public ConnectedStreams<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
-		return new ConnectedStreams<>(environment, inputStream1.groupBy(fields1),
-				inputStream2.groupBy(fields2));
+	public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
+				inputStream2.keyBy(fields2));
 	}
 
 	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
 	 * input1 and input2 using keySelector1 and keySelector2.
 	 *
 	 * @param keySelector1
@@ -174,9 +174,9 @@ public class ConnectedStreams<IN1, IN2> {
 	 *            The {@link KeySelector} used for grouping the second input
 	 * @return The partitioned {@link ConnectedStreams}
 	 */
-	public ConnectedStreams<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedStreams<>(environment, inputStream1.groupBy(keySelector1),
-				inputStream2.groupBy(keySelector2));
+	public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
+				inputStream2.keyBy(keySelector2));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3389016..6d88416 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -278,68 +278,6 @@ public class DataStream<T> {
 		return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
 				getType(), getExecutionConfig())));
 	}
-	
-	/**
-	 * Partitions the operator state of a {@link DataStream} by the given key positions. 
-	 * Mind that keyBy does not affect the partitioning of the {@link DataStream}
-	 * but only the way explicit state is partitioned among parallel instances.
-	 * 
-	 * @param fields
-	 *            The position of the fields on which the states of the {@link DataStream}
-	 *            will be partitioned.
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
-	 */
-	public GroupedDataStream<T, Tuple> groupBy(int... fields) {
-		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
-		} else {
-			return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
-		}
-	}
-
-	/**
-	 * Groups a {@link DataStream} using field expressions. A field expression
-	 * is either the name of a public field or a getter method with parentheses
-	 * of the {@link DataStream}S underlying type. A dot can be used to drill
-	 * down into objects, as in {@code "field1.getInnerField2()" }. This method
-	 * returns an {@link GroupedDataStream}.
-	 *
-	 * <p>
-	 * This operator also affects the
-	 * partitioning of the stream, by forcing values with the same key to go to
-	 * the same processing instance.
-	 * 
-	 * @param fields
-	 *            One or more field expressions on which the DataStream will be
-	 *            grouped.
-	 * @return The grouped {@link DataStream}
-	 **/
-	public GroupedDataStream<T, Tuple> groupBy(String... fields) {
-		return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
-	}
-
-	/**
-	 * Groups the elements of a {@link DataStream} by the key extracted by the
-	 * {@link KeySelector} to be used with grouped operators like
-	 * {@link GroupedDataStream#reduce(org.apache.flink.api.common.functions.ReduceFunction)}.
-	 *
-	 * <p/>
-	 * This operator also affects the partitioning of the stream, by forcing
-	 * values with the same key to go to the same processing instance.
-	 * 
-	 * @param keySelector
-	 *            The {@link KeySelector} that will be used to extract keys for
-	 *            the values
-	 * @return The grouped {@link DataStream}
-	 */
-	public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> keySelector) {
-		return new GroupedDataStream<T, K>(this, clean(keySelector));
-	}
-
-	private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) {
-		return new GroupedDataStream<T, Tuple>(this, 
-				clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
-	}
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 5893295..18c2cee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -66,10 +66,10 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	protected boolean isPartitioned = false;
 
 	protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream,
-			KeySelector<OUT, ?> groupByKey, WindowTransformation tranformation,
+			KeySelector<OUT, ?> keyByKey, WindowTransformation tranformation,
 			boolean isPartitioned) {
 		super();
-		this.groupByKey = groupByKey;
+		this.keyByKey = keyByKey;
 		this.discretizedStream = discretizedStream;
 		this.transformation = tranformation;
 		this.isPartitioned = isPartitioned;
@@ -151,8 +151,8 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 				: new ParallelMerge<OUT>(reduceFunction);
 
 		return reduced.discretizedStream
-				.groupBy(new WindowKey<OUT>())
-				.connect(numOfParts.groupBy(0))
+				.keyBy(new WindowKey<OUT>())
+				.connect(numOfParts.keyBy(0))
 				.transform(
 						"CoFlatMap",
 						reduced.discretizedStream.getType(),
@@ -218,9 +218,9 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 		if (isGrouped()) {
 			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
-					new WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism);
+					new WindowPartitioner<OUT>(keyByKey)).setParallelism(parallelism);
 
-			out.groupByKey = null;
+			out.keyByKey = null;
 			out.isPartitioned = true;
 
 			return out;
@@ -247,7 +247,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		// Only merge partitioned streams
 		if (isPartitioned) {
 			return wrap(
-					discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger",
+					discretizedStream.keyBy(new WindowKey<OUT>()).transform("Window Merger",
 							type, new WindowMerger<OUT>()).setParallelism(discretizedStream.getParallelism()), false);
 		} else {
 			return this;
@@ -258,14 +258,14 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	@SuppressWarnings("unchecked")
 	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
 			boolean isPartitioned) {
-		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
+		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
 				transformation, isPartitioned);
 	}
 
 	@SuppressWarnings("unchecked")
 	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
 			WindowTransformation transformation) {
-		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
+		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
 				transformation, isPartitioned);
 	}
 
@@ -329,7 +329,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	}
 
 	protected DiscretizedStream<OUT> copy() {
-		return new DiscretizedStream<OUT>(discretizedStream, groupByKey, transformation, isPartitioned);
+		return new DiscretizedStream<OUT>(discretizedStream, keyByKey, transformation, isPartitioned);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
deleted file mode 100644
index ebaeb56..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.operators.StreamGroupedFold;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-
-/**
- * A GroupedDataStream represents a {@link DataStream} which has been
- * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
- * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
- * get additional functionality by the grouping.
- *
- * @param <T> The type of the elements in the Grouped Stream.
- * @param <KEY> The type of the key in the Keyed Stream.
- */
-public class GroupedDataStream<T, KEY> extends KeyedStream<T, KEY> {
-
-	/**
-	 * Creates a new {@link GroupedDataStream}, group inclusion is determined using
-	 * a {@link KeySelector} on the elements of the {@link DataStream}.
-	 *
-	 * @param dataStream Base stream of data
-	 * @param keySelector Function for determining group inclusion
-	 */
-	public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
-		super(dataStream, keySelector);
-	}
-
-
-	/**
-	 * Applies a reduce transformation on the grouped data stream grouped on by
-	 * the given key position. The {@link ReduceFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same reducer.
-	 * 
-	 * @param reducer
-	 *            The {@link ReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
-		return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
-				clean(reducer), keySelector));
-	}
-
-	/**
-	 * Applies a fold transformation on the grouped data stream grouped on by
-	 * the given key position. The {@link FoldFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same folder.
-	 * 
-	 * @param folder
-	 *            The {@link FoldFunction} that will be called for every element
-	 *            of the input values with the same key.
-	 * @param initialValue
-	 *            The initialValue passed to the folders for each key.
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
-
-		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
-				keySelector, initialValue));
-	}
-
-	/**
-	 * Applies an aggregation that gives a rolling sum of the data stream at the
-	 * given position grouped by the given key. An independent aggregate is kept
-	 * per key.
-	 * 
-	 * @param positionToSum
-	 *            The position in the data point to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
-		return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current sum of the pojo data
-	 * stream at the given field expressionby the given key. An independent
-	 * aggregate is kept per key. A field expression is either the name of a
-	 * public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(String field) {
-		return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current minimum of the data
-	 * stream at the given position by the given key. An independent aggregate
-	 * is kept per key.
-	 * 
-	 * @param positionToMin
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current minimum of the pojo
-	 * data stream at the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(String field) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
-				false, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the current maximum of the data stream
-	 * at the given position by the given key. An independent aggregate is kept
-	 * per key.
-	 * 
-	 * @param positionToMax
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current maximum of the pojo
-	 * data stream at the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(String field) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
-				false, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current minimum element of the
-	 * pojo data stream by the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY,
-				first, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current maximum element of the
-	 * pojo data stream by the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY,
-				first, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * minimum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the minimum value at the
-	 * given position, the operator returns the first one by default.
-	 * 
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * minimum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the minimum value at the
-	 * given position, the operator returns the first one by default.
-	 * 
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * minimum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the minimum value at the
-	 * given position, the operator returns either the first or last one,
-	 * depending on the parameter set.
-	 * 
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            minimal value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the maximum value at the
-	 * given position, the operator returns the first one by default.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the maximum value at the
-	 * given position, the operator returns the first one by default.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the maximum value at the
-	 * given position, the operator returns either the first or last one,
-	 * depending on the parameter set.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize.
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
-				getExecutionConfig()));
-	}
-
-	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
-		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector);
-		return transform("Grouped Aggregation", getType(), operator);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 2fe3848..75216ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -188,20 +188,17 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, Iterat
 				"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
 		
 		@Override
-		public ConnectedStreams<I, F> groupBy(int keyPosition1, int keyPosition2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> groupBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-		
+		public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
 		@Override
-		public ConnectedStreams<I, F> groupBy(String field1, String field2) {throw groupingException;}
-		
+		public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
+
 		@Override
-		public ConnectedStreams<I, F> groupBy(String[] fields1, String[] fields2) {throw groupingException;}
-		
+		public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
+
 		@Override
-		public ConnectedStreams<I, F> groupBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
-		
+		public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+
 		@Override
 		public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index b3cfb55..265886b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,10 +17,19 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -39,9 +48,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
  * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
  * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
- * partitioning methods such as shuffle, forward and groupBy.
- * 
- * 
+ * partitioning methods such as shuffle, forward and keyBy.
+ *
+ * <p>
+ * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
+ * that have the same key.
+ *
  * @param <T> The type of the elements in the Keyed Stream.
  * @param <KEY> The type of the key in the Keyed Stream.
  */
@@ -59,7 +71,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *            Function for determining state partitions
 	 */
 	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
-		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
+		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
 		this.keySelector = keySelector;
 	}
 
@@ -157,4 +169,281 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
 		return new WindowedStream<>(this, assigner);
 	}
+
+	// ------------------------------------------------------------------------
+	//  Non-Windowed aggregation operations
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link ReduceFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same reducer.
+	 *
+	 * @param reducer
+	 *            The {@link ReduceFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
+		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector));
+	}
+
+	/**
+	 * Applies a fold transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link FoldFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same folder.
+	 *
+	 * @param folder
+	 *            The {@link FoldFunction} that will be called for every element
+	 *            of the input values with the same key.
+	 * @param initialValue
+	 *            The initialValue passed to the folders for each key.
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
+
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
+				Utils.getCallLocationName(), true);
+
+		return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder),
+				keySelector, initialValue));
+	}
+
+	/**
+	 * Applies an aggregation that gives a rolling sum of the data stream at the
+	 * given position grouped by the given key. An independent aggregate is kept
+	 * per key.
+	 *
+	 * @param positionToSum
+	 *            The position in the data point to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+		return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current sum of the pojo data
+	 * stream at the given field expressionby the given key. An independent
+	 * aggregate is kept per key. A field expression is either the name of a
+	 * public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> sum(String field) {
+		return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current minimum of the data
+	 * stream at the given position by the given key. An independent aggregate
+	 * is kept per key.
+	 *
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
+				getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current minimum of the pojo
+	 * data stream at the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> min(String field) {
+		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
+				false, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the current maximum of the data stream
+	 * at the given position by the given key. An independent aggregate is kept
+	 * per key.
+	 *
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
+				getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current maximum of the pojo
+	 * data stream at the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> max(String field) {
+		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
+				false, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current minimum element of the
+	 * pojo data stream by the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
+				first, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current maximum element of the
+	 * pojo data stream by the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
+				first, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the minimum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the minimum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the minimum value at the
+	 * given position, the operator returns either the first or last one,
+	 * depending on the parameter set.
+	 *
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            minimal value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
+				getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the maximum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the maximum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the maximum value at the
+	 * given position, the operator returns either the first or last one,
+	 * depending on the parameter set.
+	 *
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize.
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
+				getExecutionConfig()));
+	}
+
+	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
+		StreamGroupedReduce<T> operator = new StreamGroupedReduce<>(clean(aggregate), keySelector);
+		return transform("Keyed Aggregation", getType(), operator);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index ef6f53b..c1c5f6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -89,7 +89,7 @@ public class WindowedDataStream<T> {
 	protected boolean isLocal = false;
 
 	protected KeySelector<T, ?> discretizerKey;
-	protected KeySelector<T, ?> groupByKey;
+	protected KeySelector<T, ?> keyByKey;
 
 	protected WindowingHelper<T> triggerHelper;
 	protected WindowingHelper<T> evictionHelper;
@@ -101,8 +101,8 @@ public class WindowedDataStream<T> {
 		this.dataStream = dataStream;
 		this.triggerHelper = policyHelper;
 
-		if (dataStream instanceof GroupedDataStream) {
-			this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
+		if (dataStream instanceof KeyedStream) {
+			this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
 		}
 	}
 
@@ -113,15 +113,15 @@ public class WindowedDataStream<T> {
 		this.userTrigger = trigger;
 		this.userEvicter = evicter;
 
-		if (dataStream instanceof GroupedDataStream) {
-			this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
+		if (dataStream instanceof KeyedStream) {
+			this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
 		}
 	}
 
 	protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
 		this.dataStream = windowedDataStream.dataStream;
 		this.discretizerKey = windowedDataStream.discretizerKey;
-		this.groupByKey = windowedDataStream.groupByKey;
+		this.keyByKey = windowedDataStream.keyByKey;
 		this.triggerHelper = windowedDataStream.triggerHelper;
 		this.evictionHelper = windowedDataStream.evictionHelper;
 		this.userTrigger = windowedDataStream.userTrigger;
@@ -170,11 +170,11 @@ public class WindowedDataStream<T> {
 	 *            The position of the fields to group by.
 	 * @return The grouped {@link WindowedDataStream}
 	 */
-	public WindowedDataStream<T> groupBy(int... fields) {
+	public WindowedDataStream<T> keyBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
+			return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
 		} else {
-			return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
+			return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
 		}
 	}
 
@@ -194,8 +194,8 @@ public class WindowedDataStream<T> {
 	 *            The fields to group by
 	 * @return The grouped {@link WindowedDataStream}
 	 */
-	public WindowedDataStream<T> groupBy(String... fields) {
-		return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
+	public WindowedDataStream<T> keyBy(String... fields) {
+		return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
 	}
 
 	/**
@@ -210,14 +210,14 @@ public class WindowedDataStream<T> {
 	 *            The keySelector used to extract the key for grouping.
 	 * @return The grouped {@link WindowedDataStream}
 	 */
-	public WindowedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
+	public WindowedDataStream<T> keyBy(KeySelector<T, ?> keySelector) {
 		WindowedDataStream<T> ret = this.copy();
-		ret.groupByKey = keySelector;
+		ret.keyByKey = keySelector;
 		return ret;
 	}
 
-	private WindowedDataStream<T> groupBy(Keys<T> keys) {
-		return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
+	private WindowedDataStream<T> keyBy(Keys<T> keys) {
+		return keyBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
 				getExecutionConfig())));
 	}
 
@@ -398,7 +398,7 @@ public class WindowedDataStream<T> {
 				.setParallelism(parallelism)
 				.transform(windowBuffer.getClass().getSimpleName(),
 						new StreamWindowTypeInfo<T>(getType()), bufferOperator)
-				.setParallelism(parallelism), groupByKey, transformation, false);
+				.setParallelism(parallelism), keyByKey, transformation, false);
 
 	}
 
@@ -442,8 +442,8 @@ public class WindowedDataStream<T> {
 		// If there is a groupby for the reduce operation we apply it before the
 		// discretizers, because we will forward everything afterwards to
 		// exploit task chaining
-		if (groupByKey != null) {
-			dataStream = dataStream.groupBy(groupByKey);
+		if (keyByKey != null) {
+			dataStream = dataStream.keyBy(keyByKey);
 		}
 
 		// We discretize the stream and call the timeReduce function of the
@@ -502,28 +502,28 @@ public class WindowedDataStream<T> {
 		if (transformation == WindowTransformation.REDUCEWINDOW) {
 			if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
 				if (eviction instanceof KeepAllEvictionPolicy) {
-					if (groupByKey == null) {
+					if (keyByKey == null) {
 						return new TumblingPreReducer<T>(
 								(ReduceFunction<T>) transformation.getUDF(), getType()
 										.createSerializer(getExecutionConfig())).noEvict();
 					} else {
 						return new TumblingGroupedPreReducer<T>(
-								(ReduceFunction<T>) transformation.getUDF(), groupByKey,
+								(ReduceFunction<T>) transformation.getUDF(), keyByKey,
 								getType().createSerializer(getExecutionConfig())).noEvict();
 					}
 				} else {
-					if (groupByKey == null) {
+					if (keyByKey == null) {
 						return new TumblingPreReducer<T>(
 								(ReduceFunction<T>) transformation.getUDF(), getType()
 										.createSerializer(getExecutionConfig()));
 					} else {
 						return new TumblingGroupedPreReducer<T>(
-								(ReduceFunction<T>) transformation.getUDF(), groupByKey,
+								(ReduceFunction<T>) transformation.getUDF(), keyByKey,
 								getType().createSerializer(getExecutionConfig()));
 					}
 				}
 			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
-				if (groupByKey == null) {
+				if (keyByKey == null) {
 					return new SlidingCountPreReducer<T>(
 							clean((ReduceFunction<T>) transformation.getUDF()), dataStream
 									.getType().createSerializer(getExecutionConfig()),
@@ -532,13 +532,13 @@ public class WindowedDataStream<T> {
 				} else {
 					return new SlidingCountGroupedPreReducer<T>(
 							clean((ReduceFunction<T>) transformation.getUDF()), dataStream
-									.getType().createSerializer(getExecutionConfig()), groupByKey,
+									.getType().createSerializer(getExecutionConfig()), keyByKey,
 							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
 							((CountTriggerPolicy<?>) trigger).getStart());
 				}
 
 			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
-				if (groupByKey == null) {
+				if (keyByKey == null) {
 					return new SlidingTimePreReducer<T>(
 							(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
 									.createSerializer(getExecutionConfig()),
@@ -547,25 +547,25 @@ public class WindowedDataStream<T> {
 				} else {
 					return new SlidingTimeGroupedPreReducer<T>(
 							(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
-									.createSerializer(getExecutionConfig()), groupByKey,
+									.createSerializer(getExecutionConfig()), keyByKey,
 							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
 							WindowUtils.getTimeStampWrapper(trigger));
 				}
 
 			} else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
-				if (groupByKey == null) {
+				if (keyByKey == null) {
 					return new JumpingCountPreReducer<T>(
 							(ReduceFunction<T>) transformation.getUDF(), getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
 				} else {
 					return new JumpingCountGroupedPreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
+							(ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
 				}
 			} else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
-				if (groupByKey == null) {
+				if (keyByKey == null) {
 					return new JumpingTimePreReducer<T>(
 							(ReduceFunction<T>) transformation.getUDF(), getType()
 									.createSerializer(getExecutionConfig()),
@@ -573,7 +573,7 @@ public class WindowedDataStream<T> {
 							WindowUtils.getTimeStampWrapper(trigger));
 				} else {
 					return new JumpingTimeGroupedPreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
+							(ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
 							WindowUtils.getTimeStampWrapper(trigger));
@@ -845,7 +845,7 @@ public class WindowedDataStream<T> {
 	}
 
 	protected boolean isGrouped() {
-		return groupByKey != null;
+		return keyByKey != null;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
index 999d197..4a5622d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
@@ -211,8 +211,8 @@ public class StreamJoinOperator<I1, I2> extends
 //					op.input1.getType(), op.input2.getType());
 
 //			return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
-//					.groupBy(keys1)
-//					.connect(op.input2.groupBy(keys2))
+//					.keyBy(keys1)
+//					.connect(op.input2.keyBy(keys2))
 //					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
 //							op.slideInterval, op.timeStamp1, op.timeStamp2));
 			return null;
@@ -244,8 +244,8 @@ public class StreamJoinOperator<I1, I2> extends
 
 //				return new JoinedStream<I1, I2, OUT>(
 //						predicate, predicate.op.input1
-//						.groupBy(predicate.keys1)
-//						.connect(predicate.op.input2.groupBy(predicate.keys2))
+//						.keyBy(predicate.keys1)
+//						.connect(predicate.op.input2.keyBy(predicate.keys2))
 //						.addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
 //								predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
 				return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 7ea1309..0f9cbe9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -59,7 +59,7 @@ public class CoStreamTest extends StreamingMultipleProgramsTestBase {
 			public boolean filter(Integer value) throws Exception {
 				return true;
 			}
-		}).groupBy(new KeySelector<Integer, Integer>() {
+		}).keyBy(new KeySelector<Integer, Integer>() {
 
 			private static final long serialVersionUID = 1L;
 
@@ -88,7 +88,7 @@ public class CoStreamTest extends StreamingMultipleProgramsTestBase {
 					public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
 						return true;
 					}
-				}).disableChaining().groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+				}).disableChaining().keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
 
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 337d97b..55bf889 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
@@ -125,7 +125,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 	}
 
 	/**
-	 * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionByHash} result in
+	 * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionByHash} result in
 	 * different and correct topologies. Does the some for the {@link ConnectedStreams}.
 	 */
 	@Test
@@ -138,10 +138,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		ConnectedStreams connected = src1.connect(src2);
 
 		//Testing DataStream grouping
-		DataStream group1 = src1.groupBy(0);
-		DataStream group2 = src1.groupBy(1, 0);
-		DataStream group3 = src1.groupBy("f0");
-		DataStream group4 = src1.groupBy(new FirstSelector());
+		DataStream group1 = src1.keyBy(0);
+		DataStream group2 = src1.keyBy(1, 0);
+		DataStream group3 = src1.keyBy("f0");
+		DataStream group4 = src1.keyBy(new FirstSelector());
 
 		int id1 = createDownStreamId(group1);
 		int id2 = createDownStreamId(group2);
@@ -153,10 +153,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id3)));
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id4)));
 
-		assertTrue(isGrouped(group1));
-		assertTrue(isGrouped(group2));
-		assertTrue(isGrouped(group3));
-		assertTrue(isGrouped(group4));
+		assertTrue(isKeyed(group1));
+		assertTrue(isKeyed(group2));
+		assertTrue(isKeyed(group3));
+		assertTrue(isKeyed(group4));
 
 		//Testing DataStream partitioning
 		DataStream partition1 = src1.partitionByHash(0);
@@ -174,10 +174,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid3)));
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid4)));
 
-		assertFalse(isGrouped(partition1));
-		assertFalse(isGrouped(partition3));
-		assertFalse(isGrouped(partition2));
-		assertFalse(isGrouped(partition4));
+		assertFalse(isKeyed(partition1));
+		assertFalse(isKeyed(partition3));
+		assertFalse(isKeyed(partition2));
+		assertFalse(isKeyed(partition4));
 
 		// Testing DataStream custom partitioning
 		Partitioner<Long> longPartitioner = new Partitioner<Long>() {
@@ -199,24 +199,24 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid2)));
 		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid3)));
 
-		assertFalse(isGrouped(customPartition1));
-		assertFalse(isGrouped(customPartition3));
-		assertFalse(isGrouped(customPartition4));
+		assertFalse(isKeyed(customPartition1));
+		assertFalse(isKeyed(customPartition3));
+		assertFalse(isKeyed(customPartition4));
 
 		//Testing ConnectedStreams grouping
-		ConnectedStreams connectedGroup1 = connected.groupBy(0, 0);
+		ConnectedStreams connectedGroup1 = connected.keyBy(0, 0);
 		Integer downStreamId1 = createDownStreamId(connectedGroup1);
 
-		ConnectedStreams connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
+		ConnectedStreams connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
 		Integer downStreamId2 = createDownStreamId(connectedGroup2);
 
-		ConnectedStreams connectedGroup3 = connected.groupBy("f0", "f0");
+		ConnectedStreams connectedGroup3 = connected.keyBy("f0", "f0");
 		Integer downStreamId3 = createDownStreamId(connectedGroup3);
 
-		ConnectedStreams connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
+		ConnectedStreams connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
 		Integer downStreamId4 = createDownStreamId(connectedGroup4);
 
-		ConnectedStreams connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
+		ConnectedStreams connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
 		Integer downStreamId5 = createDownStreamId(connectedGroup5);
 
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
@@ -234,11 +234,11 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId5)));
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId5)));
 
-		assertTrue(isGrouped(connectedGroup1));
-		assertTrue(isGrouped(connectedGroup2));
-		assertTrue(isGrouped(connectedGroup3));
-		assertTrue(isGrouped(connectedGroup4));
-		assertTrue(isGrouped(connectedGroup5));
+		assertTrue(isKeyed(connectedGroup1));
+		assertTrue(isKeyed(connectedGroup2));
+		assertTrue(isKeyed(connectedGroup3));
+		assertTrue(isKeyed(connectedGroup4));
+		assertTrue(isKeyed(connectedGroup5));
 
 		//Testing ConnectedStreams partitioning
 		ConnectedStreams connectedPartition1 = connected.partitionByHash(0, 0);
@@ -281,11 +281,11 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
 				connectDownStreamId5)));
 
-		assertFalse(isGrouped(connectedPartition1));
-		assertFalse(isGrouped(connectedPartition2));
-		assertFalse(isGrouped(connectedPartition3));
-		assertFalse(isGrouped(connectedPartition4));
-		assertFalse(isGrouped(connectedPartition5));
+		assertFalse(isKeyed(connectedPartition1));
+		assertFalse(isKeyed(connectedPartition2));
+		assertFalse(isKeyed(connectedPartition3));
+		assertFalse(isKeyed(connectedPartition4));
+		assertFalse(isKeyed(connectedPartition5));
 	}
 
 	/**
@@ -601,8 +601,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		return dataStream.print().getTransformation().getId();
 	}
 
-	private static boolean isGrouped(DataStream dataStream) {
-		return dataStream instanceof GroupedDataStream;
+	private static boolean isKeyed(DataStream dataStream) {
+		return dataStream instanceof KeyedStream;
 	}
 
 	private static Integer createDownStreamId(ConnectedStreams dataStream) {
@@ -621,8 +621,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		return coMap.getId();
 	}
 
-	private static boolean isGrouped(ConnectedStreams dataStream) {
-		return (dataStream.getFirstInput() instanceof GroupedDataStream && dataStream.getSecondInput() instanceof GroupedDataStream);
+	private static boolean isKeyed(ConnectedStreams dataStream) {
+		return (dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream);
 	}
 
 	private static boolean isPartitioned(StreamEdge edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 774f58d..43371b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -401,7 +401,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 				.withFeedbackType("String");
 
 		try {
-			coIt.groupBy(1, 2);
+			coIt.keyBy(1, 2);
 			fail();
 		} catch (InvalidProgramException e) {
 			// this is expected
@@ -476,7 +476,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source = env.fromElements(1, 2, 3)
 				.map(NoOpIntMap).name("ParallelizeMap");
 
-		IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000);
+		IterativeDataStream<Integer> it = source.keyBy(key).iterate(3000);
 
 		DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
 
@@ -502,7 +502,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).groupBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
+		it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
 
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
index 11100a4..6401546 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
@@ -78,7 +78,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
 
 		SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
-			.groupBy(0)
+			.keyBy(0)
 			.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
 				@Override
 				public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
@@ -146,7 +146,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
 
 		input
-			.groupBy(0)
+			.keyBy(0)
 			.fold(
 				new NonSerializable(42),
 				new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {