You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:43 UTC

[09/13] flink git commit: [FLINK-2550] Remove groupBy and GroupedDataStream

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index abc1a18..6c439b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -194,7 +194,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
 
 		sourceStream21
-				.groupBy(2, 2)
+				.keyBy(2, 2)
 				.window(Time.of(10, new MyTimestamp(), 0))
 				.every(Time.of(4, new MyTimestamp(), 0))
 				.maxBy(3)
@@ -260,7 +260,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 				return new Tuple2<Long, Integer>(value, 1);
 			}
 		})
-				.groupBy(0)
+				.keyBy(0)
 				.window(Count.of(10000)).sum(1).flatten()
 				.filter(new FilterFunction<Tuple2<Long, Integer>>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index 512a0df..19a61ba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -179,7 +179,7 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
 			public void flatMap(Integer value, Collector<String> out) throws Exception {
 				out.collect("x " + value);
 			}
-		}).groupBy(new KeySelector<String, Integer>() {
+		}).keyBy(new KeySelector<String, Integer>() {
 
 			private static final long serialVersionUID = 1L;
 
@@ -197,7 +197,7 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
 			public Long map(Integer value) throws Exception {
 				return Long.valueOf(value + 1);
 			}
-		}).groupBy(new KeySelector<Long, Long>() {
+		}).keyBy(new KeySelector<Long, Long>() {
 
 			private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
index b762d65..2c06c00 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
@@ -69,7 +69,7 @@ public class ParallelMergeITCase extends StreamingProgramTestBase {
 		DataStream<Tuple2<String, Integer>> counts =
 				text.flatMap(new Tokenizer())
 						.window(Time.of(1000, TimeUnit.MILLISECONDS))
-						.groupBy(0)
+						.keyBy(0)
 						.sum(1)
 						.flatten();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
index db09373..4611966 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -107,15 +107,15 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
 		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new TestSink1());
 
-		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2))
+		source.window(Time.of(4, ts, 1)).keyBy(new ModKey(2))
 				.mapWindow(new IdentityWindowMap())
 				.flatten()
 				.addSink(new TestSink2()).name("TESTSIUNK2");
 
-		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+		source.keyBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new TestSink4());
 
-		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
+		source.keyBy(new ModKey(3)).window(Count.of(2)).keyBy(new ModKey(2))
 				.mapWindow(new IdentityWindowMap())
 				.flatten()
 				.addSink(new TestSink5());
@@ -123,14 +123,14 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
 		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
 				.addSink(new TestSink3());
 
-		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
+		source.keyBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
 				.addSink(new TestSink6());
 
 		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap())
 				.flatten()
 				.addSink(new TestSink7());
 
-		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).keyBy(new ModKey(2)).sum(0)
 				.getDiscretizedStream()
 				.addSink(new TestSink8());
 
@@ -152,7 +152,7 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
 
 		source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
 
-		source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
+		source.window(FullStream.window()).every(Count.of(4)).keyBy(key).sum(0)
 				.getDiscretizedStream()
 				.addSink(new TestSink12());
 
@@ -197,7 +197,7 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
 
 		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
 
-		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
+		source3.window(Time.of(5, ts, 1)).keyBy(new ModKey(2)).sum(0).getDiscretizedStream()
 				.addSink(new TestSink10());
 
 		source

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
index 7ac5616..5377e09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -42,7 +42,7 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
 
 		DataStream<Tuple2<String, Integer>> counts = text
 				.flatMap(new Tokenizer())
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		counts.writeAsCsv(resultPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
index bf96cc1..49876ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
@@ -38,7 +38,7 @@ public class SocketOutputFormatITCase extends SocketOutputTestBase {
 
 		DataStream<String> counts =
 				text.flatMap(new CsvOutputFormatITCase.Tokenizer())
-						.groupBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
+						.keyBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
 					@Override
 					public String map(Tuple2<String, Integer> value) throws Exception {
 						return value.toString() + "\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
index 6bbcea8..380f00d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -40,7 +40,7 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase {
 
 		DataStream<Tuple2<String, Integer>> counts = text
 				.flatMap(new CsvOutputFormatITCase.Tokenizer())
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		counts.writeAsText(resultPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
index 1473097..17add2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -66,7 +66,7 @@ public class SocketTextStreamWordCount {
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())
 						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0)
+						.keyBy(0)
 						.sum(1);
 
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 32a2dfe..c2477b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -69,7 +69,7 @@ public class TwitterStream {
 				// selecting English tweets and splitting to (word, 1)
 				.flatMap(new SelectEnglishAndTokenizeFlatMap())
 				// group by words and sum their occurrences
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index f8d8652..4730cc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -76,7 +76,7 @@ public class SessionWindowing {
 				});
 
 		// We create sessions for each id with max timeout of 3 time units
-		DataStream<Tuple3<String, Long, Integer>> aggregated = source.groupBy(0)
+		DataStream<Tuple3<String, Long, Integer>> aggregated = source.keyBy(0)
 				.window(new SessionTriggerPolicy(3L),
 						new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2)
 				.flatten();

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 1b48387..55d48dd 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -60,7 +60,7 @@ public class TopSpeedWindowing {
 		} else {
 			carData = env.addSource(CarSource.create(numOfCars));
 		}
-		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.groupBy(0)
+		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.keyBy(0)
 				.window(Time.of(evictionSec * 1000, new CarTimestamp()))
 				.every(Delta.of(triggerMeters,
 						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index bd3acc6..023a36a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -72,7 +72,7 @@ public class WindowWordCount {
 				// create windows of windowSize records slided every slideSize records
 				.window(Count.of(windowSize)).every(Count.of(slideSize))
 				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1)
+				.keyBy(0).sum(1)
 				// flatten the windows to a single stream
 				.flatten();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
index 5ff3fc1..591ef51 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -63,7 +63,7 @@ public class PojoExample {
 		// split up the lines into Word objects
 		text.flatMap(new Tokenizer())
 		// group by the field word and sum up the frequency
-				.groupBy("word").sum("frequency");
+				.keyBy("word").sum("frequency");
 
 		if (fileOutput) {
 			counts.writeAsText(outputPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index c207d60..a594c94 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -67,7 +67,7 @@ public class WordCount {
 		// split up the lines in pairs (2-tuples) containing: (word,1)
 		text.flatMap(new Tokenizer())
 		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
+				.keyBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
index 4dbc3fb..9ec17d4 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -57,7 +57,7 @@ object SocketTextStreamWordCount {
     val text = env.socketTextStream(hostName, port)
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .groupBy(0)
+      .keyBy(0)
       .sum(1)
 
     if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 9603f71..8e3c7d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -56,7 +56,7 @@ object TopSpeedWindowing {
 
     val cars = setCarsInput(env)
 
-    val topSeed = cars.groupBy("carId")
+    val topSeed = cars.keyBy("carId")
       .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
       .every(Delta.of[CarEvent](triggerMeters,
           (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 41c1a7a..4727cc5 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -169,8 +169,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * second input stream.
    * @return @return The transformed { @link ConnectedStreams}
    */
-  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
-    javaStream.groupBy(keyPosition1, keyPosition2)
+  def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
+    javaStream.keyBy(keyPosition1, keyPosition2)
   }
 
   /**
@@ -185,9 +185,9 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The fields used to group the second input stream.
    * @return @return The transformed { @link ConnectedStreams}
    */
-  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
+  def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
   ConnectedStreams[IN1, IN2] = {
-    javaStream.groupBy(keyPositions1, keyPositions2)
+    javaStream.keyBy(keyPositions1, keyPositions2)
   }
 
   /**
@@ -203,8 +203,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The grouping expression for the second input
    * @return The grouped { @link ConnectedStreams}
    */
-  def groupBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
-    javaStream.groupBy(field1, field2)
+  def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
+    javaStream.keyBy(field1, field2)
   }
 
   /**
@@ -221,9 +221,9 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The grouping expressions for the second input
    * @return The grouped { @link ConnectedStreams}
    */
-  def groupBy(fields1: Array[String], fields2: Array[String]): 
+  def keyBy(fields1: Array[String], fields2: Array[String]):
   ConnectedStreams[IN1, IN2] = {
-    javaStream.groupBy(fields1, fields2)
+    javaStream.keyBy(fields1, fields2)
   }
 
   /**
@@ -238,7 +238,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for grouping the second input
    * @return The grouped { @link ConnectedStreams}
    */
-  def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  def keyBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
   ConnectedStreams[IN1, IN2] = {
 
     val cleanFun1 = clean(fun1)
@@ -250,7 +250,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       def getKey(in: IN2) = cleanFun2(in)
     }
 
-    javaStream.groupBy(keyExtractor1, keyExtractor2)
+    javaStream.keyBy(keyExtractor1, keyExtractor2)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c9aee61..07828db 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -209,58 +209,31 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
     javaStream.connect(dataStream.getJavaStream)
-
-
-
-  /**
-   * Partitions the operator states of the DataStream by the given key positions 
-   * (for tuple/array types).
-   */
-  def keyBy(fields: Int*): DataStream[T] = javaStream.keyBy(fields: _*)
-
-  /**
-   *
-   * Partitions the operator states of the DataStream by the given field expressions.
-   */
-  def keyBy(firstField: String, otherFields: String*): DataStream[T] =
-    javaStream.keyBy(firstField +: otherFields.toArray: _*)
-
-
-  /**
-   * Partitions the operator states of the DataStream by the given K key. 
-   */
-  def keyBy[K: TypeInformation](fun: T => K): DataStream[T] = {
-    val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.keyBy(keyExtractor)
-  }
   
   /**
    * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(fields: _*)
+  def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = javaStream.keyBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
-   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
+  def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
+   javaStream.keyBy(firstField +: otherFields.toArray: _*)   
   
   /**
    * Groups the elements of a DataStream by the given K key to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = {
+  def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
 
     val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
       def getKey(in: T) = cleanFun(in)
     }
-    javaStream.groupBy(keyExtractor)
+    javaStream.keyBy(keyExtractor)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
deleted file mode 100644
index e1a963d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream, DataStream => JavaStream }
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce
-import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.FoldFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-
-
-class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K]) 
-  extends DataStream[T](javaStream) {
- 
-  /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function. An independent aggregate is kept per key.
-   */
-  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
-    if (reducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
- 
-    javaStream.reduce(reducer)
-  }
-
-  /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function. An independent aggregate is kept per key.
-   */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val cleanFun = clean(fun)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Creates a new [[DataStream]] by folding the elements of this DataStream
-   * using an associative fold function and an initial value. An independent 
-   * aggregate is kept per key.
-   */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
-  DataStream[R] = {
-    if (folder == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    
-    javaStream.fold(initialValue, folder).
-      returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Creates a new [[DataStream]] by folding the elements of this DataStream
-   * using an associative fold function and an initial value. An independent 
-   * aggregate is kept per key.
-   */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-    val cleanFun = clean(fun)
-    val folder = new FoldFunction[T,R] {
-      def fold(acc: R, v: T) = {
-        cleanFun(acc, v)
-      }
-    }
-    fold(initialValue, folder)
-  }
-  
-  /**
-   * Applies an aggregation that that gives the current maximum of the data stream at
-   * the given position by the given key. An independent aggregate is kept per key.
-   *
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-  
-  /**
-   * Applies an aggregation that that gives the current maximum of the data stream at
-   * the given field by the given key. An independent aggregate is kept per key.
-   *
-   */
-  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-  
-  /**
-   * Applies an aggregation that that gives the current minimum of the data stream at
-   * the given position by the given key. An independent aggregate is kept per key.
-   *
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-  
-  /**
-   * Applies an aggregation that that gives the current minimum of the data stream at
-   * the given field by the given key. An independent aggregate is kept per key.
-   *
-   */
-  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
-  /**
-   * Applies an aggregation that sums the data stream at the given position by the given 
-   * key. An independent aggregate is kept per key.
-   *
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-  
-  /**
-   * Applies an aggregation that sums the data stream at the given field by the given 
-   * key. An independent aggregate is kept per key.
-   *
-   */
-  def sum(field: String): DataStream[T] =  aggregate(AggregationType.SUM, field)
-
-  /**
-   * Applies an aggregation that that gives the current minimum element of the data stream by
-   * the given position by the given key. An independent aggregate is kept per key. 
-   * When equality, the first element is returned with the minimal value.
-   *
-   */
-  def minBy(position: Int): DataStream[T] = aggregate(AggregationType
-    .MINBY, position)
-    
-   /**
-   * Applies an aggregation that that gives the current minimum element of the data stream by
-   * the given field by the given key. An independent aggregate is kept per key.
-   * When equality, the first element is returned with the minimal value.
-   *
-   */
-  def minBy(field: String): DataStream[T] = aggregate(AggregationType
-    .MINBY, field )
-
-   /**
-   * Applies an aggregation that that gives the current maximum element of the data stream by
-   * the given position by the given key. An independent aggregate is kept per key. 
-   * When equality, the first element is returned with the maximal value.
-   *
-   */
-  def maxBy(position: Int): DataStream[T] =
-    aggregate(AggregationType.MAXBY, position)
-    
-   /**
-   * Applies an aggregation that that gives the current maximum element of the data stream by
-   * the given field by the given key. An independent aggregate is kept per key. 
-   * When equality, the first element is returned with the maximal value.
-   *
-   */
-  def maxBy(field: String): DataStream[T] =
-    aggregate(AggregationType.MAXBY, field)
-    
-  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
-    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
-    aggregate(aggregationType, position)
-  }
-
-  private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM =>
-        new SumAggregator(position, javaStream.getType, javaStream.getExecutionConfig)
-      case _ =>
-        new ComparableAggregator(position, javaStream.getType, aggregationType, true,
-          javaStream.getExecutionConfig)
-    }
-
-    val invokable =  new StreamGroupedReduce[T](reducer,javaStream.getKeySelector())
-     
-    new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
-      .asInstanceOf[DataStream[T]]
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
new file mode 100644
index 0000000..25244cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream, DataStream => JavaStream }
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.FoldFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+
+
+class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
+ 
+  /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function. An independent aggregate is kept per key.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+ 
+    javaStream.reduce(reducer)
+  }
+
+  /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function. An independent aggregate is kept per key.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val cleanFun = clean(fun)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Creates a new [[DataStream]] by folding the elements of this DataStream
+   * using an associative fold function and an initial value. An independent 
+   * aggregate is kept per key.
+   */
+  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
+  DataStream[R] = {
+    if (folder == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+    
+    javaStream.fold(initialValue, folder).
+      returns(outType).asInstanceOf[JavaStream[R]]
+  }
+
+  /**
+   * Creates a new [[DataStream]] by folding the elements of this DataStream
+   * using an associative fold function and an initial value. An independent 
+   * aggregate is kept per key.
+   */
+  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    val cleanFun = clean(fun)
+    val folder = new FoldFunction[T,R] {
+      def fold(acc: R, v: T) = {
+        cleanFun(acc, v)
+      }
+    }
+    fold(initialValue, folder)
+  }
+  
+  /**
+   * Applies an aggregation that that gives the current maximum of the data stream at
+   * the given position by the given key. An independent aggregate is kept per key.
+   *
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+  
+  /**
+   * Applies an aggregation that that gives the current maximum of the data stream at
+   * the given field by the given key. An independent aggregate is kept per key.
+   *
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+  
+  /**
+   * Applies an aggregation that that gives the current minimum of the data stream at
+   * the given position by the given key. An independent aggregate is kept per key.
+   *
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+  
+  /**
+   * Applies an aggregation that that gives the current minimum of the data stream at
+   * the given field by the given key. An independent aggregate is kept per key.
+   *
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the data stream at the given position by the given 
+   * key. An independent aggregate is kept per key.
+   *
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+  
+  /**
+   * Applies an aggregation that sums the data stream at the given field by the given 
+   * key. An independent aggregate is kept per key.
+   *
+   */
+  def sum(field: String): DataStream[T] =  aggregate(AggregationType.SUM, field)
+
+  /**
+   * Applies an aggregation that that gives the current minimum element of the data stream by
+   * the given position by the given key. An independent aggregate is kept per key. 
+   * When equality, the first element is returned with the minimal value.
+   *
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType
+    .MINBY, position)
+    
+   /**
+   * Applies an aggregation that that gives the current minimum element of the data stream by
+   * the given field by the given key. An independent aggregate is kept per key.
+   * When equality, the first element is returned with the minimal value.
+   *
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType
+    .MINBY, field )
+
+   /**
+   * Applies an aggregation that that gives the current maximum element of the data stream by
+   * the given position by the given key. An independent aggregate is kept per key. 
+   * When equality, the first element is returned with the maximal value.
+   *
+   */
+  def maxBy(position: Int): DataStream[T] =
+    aggregate(AggregationType.MAXBY, position)
+    
+   /**
+   * Applies an aggregation that that gives the current maximum element of the data stream by
+   * the given field by the given key. An independent aggregate is kept per key. 
+   * When equality, the first element is returned with the maximal value.
+   *
+   */
+  def maxBy(field: String): DataStream[T] =
+    aggregate(AggregationType.MAXBY, field)
+    
+  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
+
+  private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM =>
+        new SumAggregator(position, javaStream.getType, javaStream.getExecutionConfig)
+      case _ =>
+        new ComparableAggregator(position, javaStream.getType, aggregationType, true,
+          javaStream.getExecutionConfig)
+    }
+
+    val invokable =  new StreamGroupedReduce[T](reducer,javaStream.getKeySelector())
+     
+    new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
+      .asInstanceOf[DataStream[T]]
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index f584767..e0bbaf8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -151,7 +151,7 @@ object StreamJoinOperator {
     private def createJoinOperator(): JavaStream[(I1, I2)] = {
 
 //      val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
-//      op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+//      op.input1.keyBy(keys1).connect(op.input2.keyBy(keys2))
 //        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
 //          returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
       null
@@ -172,7 +172,7 @@ object StreamJoinOperator {
 
       val cleanFun = clean(getJoinWindowFunction(jp, fun))
 
-//      op.input1.groupBy(jp.keys1).connect(op.input2.groupBy(jp.keys2))
+//      op.input1.keyBy(jp.keys1).connect(op.input2.keyBy(jp.keys2))
 //        .addGeneralWindowCombine[R](
 //          cleanFun,
 //          implicitly[TypeInformation[R]],

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index f4d2154..8ef94f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -80,7 +80,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * DataStream.window(...) operator on an already grouped data stream.
    *
    */
-  def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*)
+  def keyBy(fields: Int*): WindowedDataStream[T] = javaStream.keyBy(fields: _*)
 
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -91,8 +91,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * DataStream.window(...) operator on an already grouped data stream.
    *
    */
-  def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
-   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
+  def keyBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
+   javaStream.keyBy(firstField +: otherFields.toArray: _*)
     
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -103,13 +103,13 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * DataStream.window(...) operator on an already grouped data stream.
    *
    */
-  def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
+  def keyBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
 
     val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
       def getKey(in: T) = cleanFun(in)
     }
-    javaStream.groupBy(keyExtractor)
+    javaStream.keyBy(keyExtractor)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index b8a3b94..625678a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -26,8 +26,8 @@ import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => JavaConStream }
-import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
+import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
 import language.implicitConversions
 
 package object scala {
@@ -38,8 +38,8 @@ package object scala {
   implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
     new DataStream[R](javaStream)
     
-  implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, K]): 
-  GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream)    
+  implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
+  KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
 
   implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
     new WindowedDataStream[R](javaWStream)
@@ -47,7 +47,7 @@ package object scala {
   implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] =
     new SplitDataStream[R](javaStream)
 
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
   ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
 
   implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 606aac5..5a5a8c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -50,7 +50,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert("testMap" == dataStream1.getName)
 
     val dataStream2 = env.generateSequence(0, 0).name("testSource2")
-      .groupBy(x=>x)
+      .keyBy(x=>x)
       .reduce((x, y) => 0)
       .name("testReduce")
     assert("testReduce" == dataStream2.getName)
@@ -83,7 +83,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
   }
 
   /**
-   * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionBy(KeySelector)} result in
+   * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionBy(KeySelector)} result in
    * different and correct topologies. Does the some for the {@link ConnectedStreams}.
    */
   @Test
@@ -95,10 +95,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
     val connected = src1.connect(src2)
 
-    val group1 = src1.groupBy(0)
-    val group2 = src1.groupBy(1, 0)
-    val group3 = src1.groupBy("_1")
-    val group4 = src1.groupBy(x => x._1)
+    val group1 = src1.keyBy(0)
+    val group2 = src1.keyBy(1, 0)
+    val group3 = src1.keyBy("_1")
+    val group4 = src1.keyBy(x => x._1)
 
     val gid1 = createDownStreamId(group1)
     val gid2 = createDownStreamId(group2)
@@ -145,20 +145,20 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid3)))
 
     //Testing ConnectedStreams grouping
-    val connectedGroup1: ConnectedStreams[_, _] = connected.groupBy(0, 0)
+    val connectedGroup1: ConnectedStreams[_, _] = connected.keyBy(0, 0)
     val downStreamId1: Integer = createDownStreamId(connectedGroup1)
 
-    val connectedGroup2: ConnectedStreams[_, _] = connected.groupBy(Array[Int](0), Array[Int](0))
+    val connectedGroup2: ConnectedStreams[_, _] = connected.keyBy(Array[Int](0), Array[Int](0))
     val downStreamId2: Integer = createDownStreamId(connectedGroup2)
 
-    val connectedGroup3: ConnectedStreams[_, _] = connected.groupBy("_1", "_1")
+    val connectedGroup3: ConnectedStreams[_, _] = connected.keyBy("_1", "_1")
     val downStreamId3: Integer = createDownStreamId(connectedGroup3)
 
     val connectedGroup4: ConnectedStreams[_, _] =
-      connected.groupBy(Array[String]("_1"), Array[String]("_1"))
+      connected.keyBy(Array[String]("_1"), Array[String]("_1"))
     val downStreamId4: Integer = createDownStreamId(connectedGroup4)
 
-    val connectedGroup5: ConnectedStreams[_, _] = connected.groupBy(x => x._1, x => x._1)
+    val connectedGroup5: ConnectedStreams[_, _] = connected.keyBy(x => x._1, x => x._1)
     val downStreamId5: Integer = createDownStreamId(connectedGroup5)
 
     assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId1)))
@@ -413,10 +413,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val foldFunction = new FoldFunction[Int, String] {
       override def fold(accumulator: String, value: Int): String = ""
     }
-    val fold = map.groupBy(x=>x).fold("", foldFunction)
+    val fold = map.keyBy(x=>x).fold("", foldFunction)
     assert(foldFunction == getFunctionForDataStream(fold))
     assert(
-      getFunctionForDataStream(map.groupBy(x=>x)
+      getFunctionForDataStream(map.keyBy(x=>x)
         .fold("", (x: String, y: Int) => ""))
         .isInstanceOf[FoldFunction[_, _]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
index 0d573a9..e09f164 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -36,7 +36,7 @@ object OutputFormatTestPrograms {
     val text = env.fromElements(input)
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .groupBy(0)
+      .keyBy(0)
       .sum(1)
 
     counts.writeAsText(outputPath)
@@ -51,7 +51,7 @@ object OutputFormatTestPrograms {
     val text = env.fromElements(input)
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .groupBy(0)
+      .keyBy(0)
       .sum(1)
 
     counts.writeAsCsv(outputPath)
@@ -66,7 +66,7 @@ object OutputFormatTestPrograms {
     val text = env.fromElements(input)
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .groupBy(0)
+      .keyBy(0)
       .sum(1)
       .map(tuple => tuple.toString() + "\n")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index d5e2b7b..2131026 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -79,7 +79,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
     })
 
     val splittedResult = sourceStream
-      .groupBy(0)
+      .keyBy(0)
       .fold(0, new FoldFunction[(Int, Int), Int] {
         override def fold(accumulator: Int, value: (Int, Int)): Int = {
           accumulator + value._2

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 706b8dd..e5a1c23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -88,7 +88,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 				.map(new StatefulCounterFunction())
 
 				// -------------- fourth vertex - reducer (failing) and the sink ----------------
-				.groupBy("prefix")
+				.keyBy("prefix")
 				.reduce(new OnceFailingReducer(NUM_STRINGS))
 				.addSink(new SinkFunction<PrefixCount>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index aa3e9e4..108e1e6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -59,7 +59,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
 
 		stream1.union(stream2)
-				.groupBy(new IdentityKeySelector<Integer>())
+				.keyBy(new IdentityKeySelector<Integer>())
 				.map(new OnceFailingPartitionedSum(NUM_STRINGS))
 				.keyBy(0)
 				.addSink(new CounterSink());

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index de8ee9d..270cfaa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -89,7 +89,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 				.startNewChain()
 
 				// -------------- fourth vertex - reducer and the sink ----------------
-				.groupBy(0)
+				.keyBy(0)
 				.reduce(new OnceFailingReducer(NUM_LONGS))
 				.addSink(new SinkFunction<Tuple1<Long>>() {
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 882634b..0804d53 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -70,7 +70,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 				.map(new StatefulCounterFunction())
 
 						// -------------- third vertex - counter and the sink ----------------
-				.groupBy("prefix")
+				.keyBy("prefix")
 				.map(new OnceFailingPrefixCounter(NUM_STRINGS))
 				.addSink(new SinkFunction<PrefixCount>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 76c8e54..a19d8f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -42,7 +42,7 @@ public class StreamingProgram {
 		DataStream<String> text = env.fromElements(WordCountData.TEXT).rebalance();
 
 		DataStream<Word> counts =
-				text.flatMap(new Tokenizer()).groupBy("word").sum("frequency");
+				text.flatMap(new Tokenizer()).keyBy("word").sum("frequency");
 
 		counts.addSink(new NoOpSink());