You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:43 UTC
[09/13] flink git commit: [FLINK-2550] Remove groupBy and
GroupedDataStream
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index abc1a18..6c439b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -194,7 +194,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
sourceStream21
- .groupBy(2, 2)
+ .keyBy(2, 2)
.window(Time.of(10, new MyTimestamp(), 0))
.every(Time.of(4, new MyTimestamp(), 0))
.maxBy(3)
@@ -260,7 +260,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
return new Tuple2<Long, Integer>(value, 1);
}
})
- .groupBy(0)
+ .keyBy(0)
.window(Count.of(10000)).sum(1).flatten()
.filter(new FilterFunction<Tuple2<Long, Integer>>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index 512a0df..19a61ba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -179,7 +179,7 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
public void flatMap(Integer value, Collector<String> out) throws Exception {
out.collect("x " + value);
}
- }).groupBy(new KeySelector<String, Integer>() {
+ }).keyBy(new KeySelector<String, Integer>() {
private static final long serialVersionUID = 1L;
@@ -197,7 +197,7 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
public Long map(Integer value) throws Exception {
return Long.valueOf(value + 1);
}
- }).groupBy(new KeySelector<Long, Long>() {
+ }).keyBy(new KeySelector<Long, Long>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
index b762d65..2c06c00 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
@@ -69,7 +69,7 @@ public class ParallelMergeITCase extends StreamingProgramTestBase {
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.window(Time.of(1000, TimeUnit.MILLISECONDS))
- .groupBy(0)
+ .keyBy(0)
.sum(1)
.flatten();
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
index db09373..4611966 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -107,15 +107,15 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new TestSink1());
- source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2))
+ source.window(Time.of(4, ts, 1)).keyBy(new ModKey(2))
.mapWindow(new IdentityWindowMap())
.flatten()
.addSink(new TestSink2()).name("TESTSIUNK2");
- source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+ source.keyBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new TestSink4());
- source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
+ source.keyBy(new ModKey(3)).window(Count.of(2)).keyBy(new ModKey(2))
.mapWindow(new IdentityWindowMap())
.flatten()
.addSink(new TestSink5());
@@ -123,14 +123,14 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
.addSink(new TestSink3());
- source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
+ source.keyBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
.addSink(new TestSink6());
source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap())
.flatten()
.addSink(new TestSink7());
- source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+ source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).keyBy(new ModKey(2)).sum(0)
.getDiscretizedStream()
.addSink(new TestSink8());
@@ -152,7 +152,7 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
- source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
+ source.window(FullStream.window()).every(Count.of(4)).keyBy(key).sum(0)
.getDiscretizedStream()
.addSink(new TestSink12());
@@ -197,7 +197,7 @@ public class WindowingITCase extends StreamingMultipleProgramsTestBase {
source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
- source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
+ source3.window(Time.of(5, ts, 1)).keyBy(new ModKey(2)).sum(0).getDiscretizedStream()
.addSink(new TestSink10());
source
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
index 7ac5616..5377e09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -42,7 +42,7 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
counts.writeAsCsv(resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
index bf96cc1..49876ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
@@ -38,7 +38,7 @@ public class SocketOutputFormatITCase extends SocketOutputTestBase {
DataStream<String> counts =
text.flatMap(new CsvOutputFormatITCase.Tokenizer())
- .groupBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
+ .keyBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.toString() + "\n";
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
index 6bbcea8..380f00d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -40,7 +40,7 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase {
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new CsvOutputFormatITCase.Tokenizer())
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
counts.writeAsText(resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
index 1473097..17add2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -66,7 +66,7 @@ public class SocketTextStreamWordCount {
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
+ .keyBy(0)
.sum(1);
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 32a2dfe..c2477b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -69,7 +69,7 @@ public class TwitterStream {
// selecting English tweets and splitting to (word, 1)
.flatMap(new SelectEnglishAndTokenizeFlatMap())
// group by words and sum their occurrences
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index f8d8652..4730cc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -76,7 +76,7 @@ public class SessionWindowing {
});
// We create sessions for each id with max timeout of 3 time units
- DataStream<Tuple3<String, Long, Integer>> aggregated = source.groupBy(0)
+ DataStream<Tuple3<String, Long, Integer>> aggregated = source.keyBy(0)
.window(new SessionTriggerPolicy(3L),
new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2)
.flatten();
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 1b48387..55d48dd 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -60,7 +60,7 @@ public class TopSpeedWindowing {
} else {
carData = env.addSource(CarSource.create(numOfCars));
}
- DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.groupBy(0)
+ DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.keyBy(0)
.window(Time.of(evictionSec * 1000, new CarTimestamp()))
.every(Delta.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index bd3acc6..023a36a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -72,7 +72,7 @@ public class WindowWordCount {
// create windows of windowSize records slided every slideSize records
.window(Count.of(windowSize)).every(Count.of(slideSize))
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1)
+ .keyBy(0).sum(1)
// flatten the windows to a single stream
.flatten();
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
index 5ff3fc1..591ef51 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -63,7 +63,7 @@ public class PojoExample {
// split up the lines into Word objects
text.flatMap(new Tokenizer())
// group by the field word and sum up the frequency
- .groupBy("word").sum("frequency");
+ .keyBy("word").sum("frequency");
if (fileOutput) {
counts.writeAsText(outputPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index c207d60..a594c94 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -67,7 +67,7 @@ public class WordCount {
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
index 4dbc3fb..9ec17d4 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -57,7 +57,7 @@ object SocketTextStreamWordCount {
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
- .groupBy(0)
+ .keyBy(0)
.sum(1)
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 9603f71..8e3c7d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -56,7 +56,7 @@ object TopSpeedWindowing {
val cars = setCarsInput(env)
- val topSeed = cars.groupBy("carId")
+ val topSeed = cars.keyBy("carId")
.window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
.every(Delta.of[CarEvent](triggerMeters,
(oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 41c1a7a..4727cc5 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -169,8 +169,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* second input stream.
* @return @return The transformed { @link ConnectedStreams}
*/
- def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
- javaStream.groupBy(keyPosition1, keyPosition2)
+ def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
+ javaStream.keyBy(keyPosition1, keyPosition2)
}
/**
@@ -185,9 +185,9 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The fields used to group the second input stream.
* @return @return The transformed { @link ConnectedStreams}
*/
- def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+ def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
ConnectedStreams[IN1, IN2] = {
- javaStream.groupBy(keyPositions1, keyPositions2)
+ javaStream.keyBy(keyPositions1, keyPositions2)
}
/**
@@ -203,8 +203,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The grouping expression for the second input
* @return The grouped { @link ConnectedStreams}
*/
- def groupBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
- javaStream.groupBy(field1, field2)
+ def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
+ javaStream.keyBy(field1, field2)
}
/**
@@ -221,9 +221,9 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The grouping expressions for the second input
* @return The grouped { @link ConnectedStreams}
*/
- def groupBy(fields1: Array[String], fields2: Array[String]):
+ def keyBy(fields1: Array[String], fields2: Array[String]):
ConnectedStreams[IN1, IN2] = {
- javaStream.groupBy(fields1, fields2)
+ javaStream.keyBy(fields1, fields2)
}
/**
@@ -238,7 +238,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The function used for grouping the second input
* @return The grouped { @link ConnectedStreams}
*/
- def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+ def keyBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
ConnectedStreams[IN1, IN2] = {
val cleanFun1 = clean(fun1)
@@ -250,7 +250,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
def getKey(in: IN2) = cleanFun2(in)
}
- javaStream.groupBy(keyExtractor1, keyExtractor2)
+ javaStream.keyBy(keyExtractor1, keyExtractor2)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c9aee61..07828db 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -209,58 +209,31 @@ class DataStream[T](javaStream: JavaStream[T]) {
*/
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
javaStream.connect(dataStream.getJavaStream)
-
-
-
- /**
- * Partitions the operator states of the DataStream by the given key positions
- * (for tuple/array types).
- */
- def keyBy(fields: Int*): DataStream[T] = javaStream.keyBy(fields: _*)
-
- /**
- *
- * Partitions the operator states of the DataStream by the given field expressions.
- */
- def keyBy(firstField: String, otherFields: String*): DataStream[T] =
- javaStream.keyBy(firstField +: otherFields.toArray: _*)
-
-
- /**
- * Partitions the operator states of the DataStream by the given K key.
- */
- def keyBy[K: TypeInformation](fun: T => K): DataStream[T] = {
- val cleanFun = clean(fun)
- val keyExtractor = new KeySelector[T, K] {
- def getKey(in: T) = cleanFun(in)
- }
- javaStream.keyBy(keyExtractor)
- }
/**
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(fields: _*)
+ def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = javaStream.keyBy(fields: _*)
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
- javaStream.groupBy(firstField +: otherFields.toArray: _*)
+ def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
+ javaStream.keyBy(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = {
+ def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
def getKey(in: T) = cleanFun(in)
}
- javaStream.groupBy(keyExtractor)
+ javaStream.keyBy(keyExtractor)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
deleted file mode 100644
index e1a963d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream, DataStream => JavaStream }
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce
-import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.FoldFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-
-
-class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K])
- extends DataStream[T](javaStream) {
-
- /**
- * Creates a new [[DataStream]] by reducing the elements of this DataStream
- * using an associative reduce function. An independent aggregate is kept per key.
- */
- def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
- if (reducer == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
-
- javaStream.reduce(reducer)
- }
-
- /**
- * Creates a new [[DataStream]] by reducing the elements of this DataStream
- * using an associative reduce function. An independent aggregate is kept per key.
- */
- def reduce(fun: (T, T) => T): DataStream[T] = {
- if (fun == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
- val cleanFun = clean(fun)
- val reducer = new ReduceFunction[T] {
- def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
- }
- reduce(reducer)
- }
-
- /**
- * Creates a new [[DataStream]] by folding the elements of this DataStream
- * using an associative fold function and an initial value. An independent
- * aggregate is kept per key.
- */
- def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]):
- DataStream[R] = {
- if (folder == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
- javaStream.fold(initialValue, folder).
- returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Creates a new [[DataStream]] by folding the elements of this DataStream
- * using an associative fold function and an initial value. An independent
- * aggregate is kept per key.
- */
- def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
- val cleanFun = clean(fun)
- val folder = new FoldFunction[T,R] {
- def fold(acc: R, v: T) = {
- cleanFun(acc, v)
- }
- }
- fold(initialValue, folder)
- }
-
- /**
- * Applies an aggregation that that gives the current maximum of the data stream at
- * the given position by the given key. An independent aggregate is kept per key.
- *
- */
- def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
- /**
- * Applies an aggregation that that gives the current maximum of the data stream at
- * the given field by the given key. An independent aggregate is kept per key.
- *
- */
- def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
- /**
- * Applies an aggregation that that gives the current minimum of the data stream at
- * the given position by the given key. An independent aggregate is kept per key.
- *
- */
- def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
- /**
- * Applies an aggregation that that gives the current minimum of the data stream at
- * the given field by the given key. An independent aggregate is kept per key.
- *
- */
- def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
- /**
- * Applies an aggregation that sums the data stream at the given position by the given
- * key. An independent aggregate is kept per key.
- *
- */
- def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
- /**
- * Applies an aggregation that sums the data stream at the given field by the given
- * key. An independent aggregate is kept per key.
- *
- */
- def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
- /**
- * Applies an aggregation that that gives the current minimum element of the data stream by
- * the given position by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the minimal value.
- *
- */
- def minBy(position: Int): DataStream[T] = aggregate(AggregationType
- .MINBY, position)
-
- /**
- * Applies an aggregation that that gives the current minimum element of the data stream by
- * the given field by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the minimal value.
- *
- */
- def minBy(field: String): DataStream[T] = aggregate(AggregationType
- .MINBY, field )
-
- /**
- * Applies an aggregation that that gives the current maximum element of the data stream by
- * the given position by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the maximal value.
- *
- */
- def maxBy(position: Int): DataStream[T] =
- aggregate(AggregationType.MAXBY, position)
-
- /**
- * Applies an aggregation that that gives the current maximum element of the data stream by
- * the given field by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the maximal value.
- *
- */
- def maxBy(field: String): DataStream[T] =
- aggregate(AggregationType.MAXBY, field)
-
- private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
- val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
- aggregate(aggregationType, position)
- }
-
- private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
- val reducer = aggregationType match {
- case AggregationType.SUM =>
- new SumAggregator(position, javaStream.getType, javaStream.getExecutionConfig)
- case _ =>
- new ComparableAggregator(position, javaStream.getType, aggregationType, true,
- javaStream.getExecutionConfig)
- }
-
- val invokable = new StreamGroupedReduce[T](reducer,javaStream.getKeySelector())
-
- new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
- .asInstanceOf[DataStream[T]]
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
new file mode 100644
index 0000000..25244cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream, DataStream => JavaStream }
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.FoldFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+
+
+class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
+
+ /**
+ * Creates a new [[DataStream]] by reducing the elements of this DataStream
+ * using an associative reduce function. An independent aggregate is kept per key.
+ */
+ def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+ if (reducer == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+
+ javaStream.reduce(reducer)
+ }
+
+ /**
+ * Creates a new [[DataStream]] by reducing the elements of this DataStream
+ * using an associative reduce function. An independent aggregate is kept per key.
+ */
+ def reduce(fun: (T, T) => T): DataStream[T] = {
+ if (fun == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ val cleanFun = clean(fun)
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+ }
+ reduce(reducer)
+ }
+
+ /**
+ * Creates a new [[DataStream]] by folding the elements of this DataStream
+ * using an associative fold function and an initial value. An independent
+ * aggregate is kept per key.
+ */
+ def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]):
+ DataStream[R] = {
+ if (folder == null) {
+ throw new NullPointerException("Fold function must not be null.")
+ }
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+ javaStream.fold(initialValue, folder).
+ returns(outType).asInstanceOf[JavaStream[R]]
+ }
+
+ /**
+ * Creates a new [[DataStream]] by folding the elements of this DataStream
+ * using an associative fold function and an initial value. An independent
+ * aggregate is kept per key.
+ */
+ def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
+ if (fun == null) {
+ throw new NullPointerException("Fold function must not be null.")
+ }
+ val cleanFun = clean(fun)
+ val folder = new FoldFunction[T,R] {
+ def fold(acc: R, v: T) = {
+ cleanFun(acc, v)
+ }
+ }
+ fold(initialValue, folder)
+ }
+
+ /**
+ * Applies an aggregation that that gives the current maximum of the data stream at
+ * the given position by the given key. An independent aggregate is kept per key.
+ *
+ */
+ def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+
+ /**
+ * Applies an aggregation that that gives the current maximum of the data stream at
+ * the given field by the given key. An independent aggregate is kept per key.
+ *
+ */
+ def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+ /**
+ * Applies an aggregation that that gives the current minimum of the data stream at
+ * the given position by the given key. An independent aggregate is kept per key.
+ *
+ */
+ def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+
+ /**
+ * Applies an aggregation that that gives the current minimum of the data stream at
+ * the given field by the given key. An independent aggregate is kept per key.
+ *
+ */
+ def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+ /**
+ * Applies an aggregation that sums the data stream at the given position by the given
+ * key. An independent aggregate is kept per key.
+ *
+ */
+ def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+
+ /**
+ * Applies an aggregation that sums the data stream at the given field by the given
+ * key. An independent aggregate is kept per key.
+ *
+ */
+ def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+ /**
+ * Applies an aggregation that that gives the current minimum element of the data stream by
+ * the given position by the given key. An independent aggregate is kept per key.
+ * When equality, the first element is returned with the minimal value.
+ *
+ */
+ def minBy(position: Int): DataStream[T] = aggregate(AggregationType
+ .MINBY, position)
+
+ /**
+ * Applies an aggregation that that gives the current minimum element of the data stream by
+ * the given field by the given key. An independent aggregate is kept per key.
+ * When equality, the first element is returned with the minimal value.
+ *
+ */
+ def minBy(field: String): DataStream[T] = aggregate(AggregationType
+ .MINBY, field )
+
+ /**
+ * Applies an aggregation that that gives the current maximum element of the data stream by
+ * the given position by the given key. An independent aggregate is kept per key.
+ * When equality, the first element is returned with the maximal value.
+ *
+ */
+ def maxBy(position: Int): DataStream[T] =
+ aggregate(AggregationType.MAXBY, position)
+
+ /**
+ * Applies an aggregation that that gives the current maximum element of the data stream by
+ * the given field by the given key. An independent aggregate is kept per key.
+ * When equality, the first element is returned with the maximal value.
+ *
+ */
+ def maxBy(field: String): DataStream[T] =
+ aggregate(AggregationType.MAXBY, field)
+
+ private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+ val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+ aggregate(aggregationType, position)
+ }
+
+ private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
+
+ val reducer = aggregationType match {
+ case AggregationType.SUM =>
+ new SumAggregator(position, javaStream.getType, javaStream.getExecutionConfig)
+ case _ =>
+ new ComparableAggregator(position, javaStream.getType, aggregationType, true,
+ javaStream.getExecutionConfig)
+ }
+
+ val invokable = new StreamGroupedReduce[T](reducer,javaStream.getKeySelector())
+
+ new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
+ .asInstanceOf[DataStream[T]]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index f584767..e0bbaf8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -151,7 +151,7 @@ object StreamJoinOperator {
private def createJoinOperator(): JavaStream[(I1, I2)] = {
// val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
-// op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+// op.input1.keyBy(keys1).connect(op.input2.keyBy(keys2))
// .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
// returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
null
@@ -172,7 +172,7 @@ object StreamJoinOperator {
val cleanFun = clean(getJoinWindowFunction(jp, fun))
-// op.input1.groupBy(jp.keys1).connect(op.input2.groupBy(jp.keys2))
+// op.input1.keyBy(jp.keys1).connect(op.input2.keyBy(jp.keys2))
// .addGeneralWindowCombine[R](
// cleanFun,
// implicitly[TypeInformation[R]],
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index f4d2154..8ef94f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -80,7 +80,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
* DataStream.window(...) operator on an already grouped data stream.
*
*/
- def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*)
+ def keyBy(fields: Int*): WindowedDataStream[T] = javaStream.keyBy(fields: _*)
/**
* Groups the elements of the WindowedDataStream using the given
@@ -91,8 +91,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
* DataStream.window(...) operator on an already grouped data stream.
*
*/
- def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
- javaStream.groupBy(firstField +: otherFields.toArray: _*)
+ def keyBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
+ javaStream.keyBy(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of the WindowedDataStream using the given
@@ -103,13 +103,13 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
* DataStream.window(...) operator on an already grouped data stream.
*
*/
- def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
+ def keyBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
def getKey(in: T) = cleanFun(in)
}
- javaStream.groupBy(keyExtractor)
+ javaStream.keyBy(keyExtractor)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index b8a3b94..625678a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -26,8 +26,8 @@ import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => JavaConStream }
-import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
+import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
import language.implicitConversions
package object scala {
@@ -38,8 +38,8 @@ package object scala {
implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
new DataStream[R](javaStream)
- implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, K]):
- GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream)
+ implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
+ KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
new WindowedDataStream[R](javaWStream)
@@ -47,7 +47,7 @@ package object scala {
implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] =
new SplitDataStream[R](javaStream)
- implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):
+ implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 606aac5..5a5a8c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -50,7 +50,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert("testMap" == dataStream1.getName)
val dataStream2 = env.generateSequence(0, 0).name("testSource2")
- .groupBy(x=>x)
+ .keyBy(x=>x)
.reduce((x, y) => 0)
.name("testReduce")
assert("testReduce" == dataStream2.getName)
@@ -83,7 +83,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
/**
- * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionBy(KeySelector)} result in
+ * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionBy(KeySelector)} result in
* different and correct topologies. Does the some for the {@link ConnectedStreams}.
*/
@Test
@@ -95,10 +95,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val connected = src1.connect(src2)
- val group1 = src1.groupBy(0)
- val group2 = src1.groupBy(1, 0)
- val group3 = src1.groupBy("_1")
- val group4 = src1.groupBy(x => x._1)
+ val group1 = src1.keyBy(0)
+ val group2 = src1.keyBy(1, 0)
+ val group3 = src1.keyBy("_1")
+ val group4 = src1.keyBy(x => x._1)
val gid1 = createDownStreamId(group1)
val gid2 = createDownStreamId(group2)
@@ -145,20 +145,20 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid3)))
//Testing ConnectedStreams grouping
- val connectedGroup1: ConnectedStreams[_, _] = connected.groupBy(0, 0)
+ val connectedGroup1: ConnectedStreams[_, _] = connected.keyBy(0, 0)
val downStreamId1: Integer = createDownStreamId(connectedGroup1)
- val connectedGroup2: ConnectedStreams[_, _] = connected.groupBy(Array[Int](0), Array[Int](0))
+ val connectedGroup2: ConnectedStreams[_, _] = connected.keyBy(Array[Int](0), Array[Int](0))
val downStreamId2: Integer = createDownStreamId(connectedGroup2)
- val connectedGroup3: ConnectedStreams[_, _] = connected.groupBy("_1", "_1")
+ val connectedGroup3: ConnectedStreams[_, _] = connected.keyBy("_1", "_1")
val downStreamId3: Integer = createDownStreamId(connectedGroup3)
val connectedGroup4: ConnectedStreams[_, _] =
- connected.groupBy(Array[String]("_1"), Array[String]("_1"))
+ connected.keyBy(Array[String]("_1"), Array[String]("_1"))
val downStreamId4: Integer = createDownStreamId(connectedGroup4)
- val connectedGroup5: ConnectedStreams[_, _] = connected.groupBy(x => x._1, x => x._1)
+ val connectedGroup5: ConnectedStreams[_, _] = connected.keyBy(x => x._1, x => x._1)
val downStreamId5: Integer = createDownStreamId(connectedGroup5)
assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId1)))
@@ -413,10 +413,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val foldFunction = new FoldFunction[Int, String] {
override def fold(accumulator: String, value: Int): String = ""
}
- val fold = map.groupBy(x=>x).fold("", foldFunction)
+ val fold = map.keyBy(x=>x).fold("", foldFunction)
assert(foldFunction == getFunctionForDataStream(fold))
assert(
- getFunctionForDataStream(map.groupBy(x=>x)
+ getFunctionForDataStream(map.keyBy(x=>x)
.fold("", (x: String, y: Int) => ""))
.isInstanceOf[FoldFunction[_, _]])
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
index 0d573a9..e09f164 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -36,7 +36,7 @@ object OutputFormatTestPrograms {
val text = env.fromElements(input)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
- .groupBy(0)
+ .keyBy(0)
.sum(1)
counts.writeAsText(outputPath)
@@ -51,7 +51,7 @@ object OutputFormatTestPrograms {
val text = env.fromElements(input)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
- .groupBy(0)
+ .keyBy(0)
.sum(1)
counts.writeAsCsv(outputPath)
@@ -66,7 +66,7 @@ object OutputFormatTestPrograms {
val text = env.fromElements(input)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
- .groupBy(0)
+ .keyBy(0)
.sum(1)
.map(tuple => tuple.toString() + "\n")
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index d5e2b7b..2131026 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -79,7 +79,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
})
val splittedResult = sourceStream
- .groupBy(0)
+ .keyBy(0)
.fold(0, new FoldFunction[(Int, Int), Int] {
override def fold(accumulator: Int, value: (Int, Int)): Int = {
accumulator + value._2
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 706b8dd..e5a1c23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -88,7 +88,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
.map(new StatefulCounterFunction())
// -------------- fourth vertex - reducer (failing) and the sink ----------------
- .groupBy("prefix")
+ .keyBy("prefix")
.reduce(new OnceFailingReducer(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index aa3e9e4..108e1e6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -59,7 +59,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
stream1.union(stream2)
- .groupBy(new IdentityKeySelector<Integer>())
+ .keyBy(new IdentityKeySelector<Integer>())
.map(new OnceFailingPartitionedSum(NUM_STRINGS))
.keyBy(0)
.addSink(new CounterSink());
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index de8ee9d..270cfaa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -89,7 +89,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
.startNewChain()
// -------------- fourth vertex - reducer and the sink ----------------
- .groupBy(0)
+ .keyBy(0)
.reduce(new OnceFailingReducer(NUM_LONGS))
.addSink(new SinkFunction<Tuple1<Long>>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 882634b..0804d53 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -70,7 +70,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
.map(new StatefulCounterFunction())
// -------------- third vertex - counter and the sink ----------------
- .groupBy("prefix")
+ .keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 76c8e54..a19d8f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -42,7 +42,7 @@ public class StreamingProgram {
DataStream<String> text = env.fromElements(WordCountData.TEXT).rebalance();
DataStream<Word> counts =
- text.flatMap(new Tokenizer()).groupBy("word").sum("frequency");
+ text.flatMap(new Tokenizer()).keyBy("word").sum("frequency");
counts.addSink(new NoOpSink());