You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:44 UTC
[10/13] flink git commit: [FLINK-2550] Remove groupBy and
GroupedDataStream
[FLINK-2550] Remove groupBy and GroupedDataStream
Their functionality is subsumed by keyBy and KeyedStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9baadfe8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9baadfe8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9baadfe8
Branch: refs/heads/master
Commit: 9baadfe8464976b039e3f5859b910c4ad2e29ac5
Parents: 23d8e26
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 1 17:56:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 6 +-
.../api/FlinkTopologyBuilder.java | 4 +-
.../wordcount/BoltTokenizerWordCount.java | 2 +-
.../wordcount/BoltTokenizerWordCountPojo.java | 2 +-
.../BoltTokenizerWordCountWithNames.java | 2 +-
.../wordcount/SpoutSourceWordCount.java | 2 +-
.../examples/java8/wordcount/WordCount.java | 2 +-
.../main/java/SocketTextStreamWordCount.java | 2 +-
.../main/scala/SocketTextStreamWordCount.scala | 2 +-
.../connectors/twitter/TwitterTopology.java | 2 +-
.../api/datastream/ConnectedStreams.java | 40 +--
.../streaming/api/datastream/DataStream.java | 62 ----
.../api/datastream/DiscretizedStream.java | 20 +-
.../api/datastream/GroupedDataStream.java | 329 -------------------
.../api/datastream/IterativeDataStream.java | 19 +-
.../streaming/api/datastream/KeyedStream.java | 297 ++++++++++++++++-
.../api/datastream/WindowedDataStream.java | 62 ++--
.../datastream/temporal/StreamJoinOperator.java | 8 +-
.../flink/streaming/api/CoStreamTest.java | 4 +-
.../flink/streaming/api/DataStreamTest.java | 72 ++--
.../apache/flink/streaming/api/IterateTest.java | 6 +-
.../streaming/api/StreamingOperatorsITCase.java | 4 +-
.../api/complex/ComplexIntegrationTest.java | 4 +-
.../api/operators/co/SelfConnectionTest.java | 4 +-
.../windowing/ParallelMergeITCase.java | 2 +-
.../operators/windowing/WindowingITCase.java | 14 +-
.../api/outputformat/CsvOutputFormatITCase.java | 2 +-
.../outputformat/SocketOutputFormatITCase.java | 2 +-
.../outputformat/TextOutputFormatITCase.java | 2 +-
.../socket/SocketTextStreamWordCount.java | 2 +-
.../examples/twitter/TwitterStream.java | 2 +-
.../examples/windowing/SessionWindowing.java | 2 +-
.../examples/windowing/TopSpeedWindowing.java | 2 +-
.../examples/windowing/WindowWordCount.java | 2 +-
.../examples/wordcount/PojoExample.java | 2 +-
.../streaming/examples/wordcount/WordCount.java | 2 +-
.../socket/SocketTextStreamWordCount.scala | 2 +-
.../examples/windowing/TopSpeedWindowing.scala | 2 +-
.../streaming/api/scala/ConnectedStreams.scala | 20 +-
.../flink/streaming/api/scala/DataStream.scala | 37 +--
.../streaming/api/scala/GroupedDataStream.scala | 196 -----------
.../flink/streaming/api/scala/KeyedStream.scala | 195 +++++++++++
.../api/scala/StreamJoinOperator.scala | 4 +-
.../api/scala/WindowedDataStream.scala | 10 +-
.../flink/streaming/api/scala/package.scala | 10 +-
.../streaming/api/scala/DataStreamTest.scala | 26 +-
.../api/scala/OutputFormatTestPrograms.scala | 6 +-
.../api/scala/StreamingOperatorsITCase.scala | 2 +-
.../CoStreamCheckpointingITCase.java | 2 +-
.../PartitionedStateCheckpointingITCase.java | 2 +-
.../StreamCheckpointNotifierITCase.java | 2 +-
.../StreamCheckpointingITCase.java | 2 +-
.../test/classloading/jar/StreamingProgram.java | 2 +-
53 files changed, 690 insertions(+), 823 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index c437114..3f5a98f 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -627,7 +627,7 @@ dataStream.filter{ _ != 0 }
<br/>
A map that produces a rolling average per key:</p>
{% highlight scala %}
-dataStream.keyBy(..).mapWithState((in, state: Option[(Long, Int)]) => state match {
+dataStream.groupBy(..).mapWithState((in, state: Option[(Long, Int)]) => state match {
case Some((sum, count)) => ((sum + in)/(count + 1), Some((sum + in, count + 1)))
case None => (in, Some((in, 1)))
})
@@ -713,7 +713,7 @@ dataStream.union(otherStream1, otherStream2, …)
### Grouped operators
-Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`.
+Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`.
Keys can be of three types: field positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances.
Aggregation or reduce operators called on `GroupedDataStream`s produce elements on a per group basis.
@@ -1313,7 +1313,7 @@ Checkpointing of the states needs to be enabled from the `StreamExecutionEnviron
Operator states can be accessed from the `RuntimeContext` using the `getOperatorState(“name”, defaultValue, partitioned)` method so it is only accessible in `RichFunction`s. A recommended usage pattern is to retrieve the operator state in the `open(…)` method of the operator and set it as a field in the operator instance for runtime usage. Multiple `OperatorState`s can be used simultaneously by the same operator by using different names to identify them.
-Partitioned operator state works only on `KeyedDataStreams`. A `KeyedDataStream` can be created from `DataStream` using the `keyBy` or `groupBy` methods. The `keyBy` method simply takes a `KeySelector` to derive the keys by which the operator state will be partitioned, however, it does not affect the actual partitioning of the `DataStream` records. If data partitioning is also desired then the `groupBy` method should be used instead to create a `GroupedDataStream` which is a subtype of `KeyedDataStream`. Mind that `KeyedDataStreams` do not support repartitioning (e.g. `shuffle(), forward(), groupBy(...)`).
+Partitioned operator state works only on `KeyedDataStreams`. A `KeyedDataStream` can be created from `DataStream` using the `groupBy` or `groupBy` methods. The `groupBy` method simply takes a `KeySelector` to derive the keys by which the operator state will be partitioned, however, it does not affect the actual partitioning of the `DataStream` records. If data partitioning is also desired then the `groupBy` method should be used instead to create a `GroupedDataStream` which is a subtype of `KeyedDataStream`. Mind that `KeyedDataStreams` do not support repartitioning (e.g. `shuffle(), forward(), groupBy(...)`).
By default operator states are checkpointed using default java serialization thus they need to be `Serializable`. The user can gain more control over the state checkpoint mechanism by passing a `StateCheckpointer` instance when retrieving the `OperatorState` from the `RuntimeContext`. The `StateCheckpointer` allows custom implementations for the checkpointing logic for increased efficiency and to store arbitrary non-serializable states.
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e4d880f..e2d819c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -195,12 +195,12 @@ public class FlinkTopologyBuilder {
if (fields.size() > 0) {
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
if (producer.size() == 1) {
- inputStream = inputStream.groupBy(prodDeclarer
+ inputStream = inputStream.keyBy(prodDeclarer
.getGroupingFieldIndexes(inputStreamId,
grouping.get_fields()));
} else {
inputStream = inputStream
- .groupBy(new SplitStreamTypeKeySelector(
+ .keyBy(new SplitStreamTypeKeySelector(
prodDeclarer.getGroupingFieldIndexes(
inputStreamId,
grouping.get_fields())));
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index eab58f5..6f7b6fb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -69,7 +69,7 @@ public class BoltTokenizerWordCount {
new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
// split up the lines in pairs (2-tuples) containing: (word,1)
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
index 20e69db..300f5bc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
@@ -77,7 +77,7 @@ public class BoltTokenizerWordCountPojo {
new StormBoltTokenizerByName()))
// split up the lines in pairs (2-tuples) containing: (word,1)
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
index e233da1..ed01181 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
@@ -80,7 +80,7 @@ public class BoltTokenizerWordCountWithNames {
new StormBoltTokenizerByName(), new Fields("sentence")))
// split up the lines in pairs (2-tuples) containing: (word,1)
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index cbd054b..21d7811 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -70,7 +70,7 @@ public class SpoutSourceWordCount {
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1);
+ .keyBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
index c1d6042..41a674f 100644
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
@@ -73,7 +73,7 @@ public class WordCount {
.forEach(t -> out.collect(new Tuple2<>(t, 1)));
})
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
+ .keyBy(0)
.sum(1);
// emit result
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
index 3888d7b..10d8044 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
@@ -76,7 +76,7 @@ public class SocketTextStreamWordCount {
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
+ .keyBy(0)
.sum(1);
counts.print();
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
index 63d840d..9bc85ea 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/SocketTextStreamWordCount.scala
@@ -59,7 +59,7 @@ object SocketTextStreamWordCount {
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
- .groupBy(0)
+ .keyBy(0)
.sum(1)
counts print
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
index d5e8c41..b1fc92c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -82,7 +82,7 @@ public class TwitterTopology {
return new Tuple2<String, Integer>(value, 1);
}
})
- .groupBy(0)
+ .keyBy(0)
.sum(1);
dataStream.print();
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 2447c1e..4074a1d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -96,7 +96,7 @@ public class ConnectedStreams<IN1, IN2> {
}
/**
- * GroupBy operation for connected data stream. Groups the elements of
+ * KeyBy operation for connected data stream. Assigns keys to the elements of
* input1 and input2 according to keyPosition1 and keyPosition2.
*
* @param keyPosition1
@@ -107,13 +107,13 @@ public class ConnectedStreams<IN1, IN2> {
* second input stream.
* @return The grouped {@link ConnectedStreams}
*/
- public ConnectedStreams<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
- return new ConnectedStreams<>(this.environment, inputStream1.groupBy(keyPosition1),
- inputStream2.groupBy(keyPosition2));
+ public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
+ return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
+ inputStream2.keyBy(keyPosition2));
}
/**
- * GroupBy operation for connected data stream. Groups the elements of
+ * KeyBy operation for connected data stream. Assigns keys to the elements of
* input1 and input2 according to keyPositions1 and keyPositions2.
*
* @param keyPositions1
@@ -122,13 +122,13 @@ public class ConnectedStreams<IN1, IN2> {
* The fields used to group the second input stream.
* @return The grouped {@link ConnectedStreams}
*/
- public ConnectedStreams<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
- return new ConnectedStreams<>(environment, inputStream1.groupBy(keyPositions1),
- inputStream2.groupBy(keyPositions2));
+ public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
+ inputStream2.keyBy(keyPositions2));
}
/**
- * GroupBy operation for connected data stream using key expressions. Groups
+ * KeyBy operation for connected data stream using key expressions. Assigns keys to
* the elements of input1 and input2 according to field1 and field2. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
@@ -140,13 +140,13 @@ public class ConnectedStreams<IN1, IN2> {
* The grouping expression for the second input
* @return The grouped {@link ConnectedStreams}
*/
- public ConnectedStreams<IN1, IN2> groupBy(String field1, String field2) {
- return new ConnectedStreams<>(environment, inputStream1.groupBy(field1),
- inputStream2.groupBy(field2));
+ public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
+ inputStream2.keyBy(field2));
}
/**
- * GroupBy operation for connected data stream using key expressions. Groups
+ * KeyBy operation for connected data stream using key expressions.
* the elements of input1 and input2 according to fields1 and fields2. A
* field expression is either the name of a public field or a getter method
* with parentheses of the {@link DataStream}S underlying type. A dot can be
@@ -159,13 +159,13 @@ public class ConnectedStreams<IN1, IN2> {
* The grouping expressions for the second input
* @return The grouped {@link ConnectedStreams}
*/
- public ConnectedStreams<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
- return new ConnectedStreams<>(environment, inputStream1.groupBy(fields1),
- inputStream2.groupBy(fields2));
+ public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
+ inputStream2.keyBy(fields2));
}
/**
- * GroupBy operation for connected data stream. Groups the elements of
+ * KeyBy operation for connected data stream. Assigns keys to the elements of
* input1 and input2 using keySelector1 and keySelector2.
*
* @param keySelector1
@@ -174,9 +174,9 @@ public class ConnectedStreams<IN1, IN2> {
* The {@link KeySelector} used for grouping the second input
* @return The partitioned {@link ConnectedStreams}
*/
- public ConnectedStreams<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
- return new ConnectedStreams<>(environment, inputStream1.groupBy(keySelector1),
- inputStream2.groupBy(keySelector2));
+ public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
+ inputStream2.keyBy(keySelector2));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3389016..6d88416 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -278,68 +278,6 @@ public class DataStream<T> {
return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
-
- /**
- * Partitions the operator state of a {@link DataStream} by the given key positions.
- * Mind that keyBy does not affect the partitioning of the {@link DataStream}
- * but only the way explicit state is partitioned among parallel instances.
- *
- * @param fields
- * The position of the fields on which the states of the {@link DataStream}
- * will be partitioned.
- * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
- */
- public GroupedDataStream<T, Tuple> groupBy(int... fields) {
- if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
- return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
- } else {
- return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
- }
- }
-
- /**
- * Groups a {@link DataStream} using field expressions. A field expression
- * is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream}S underlying type. A dot can be used to drill
- * down into objects, as in {@code "field1.getInnerField2()" }. This method
- * returns an {@link GroupedDataStream}.
- *
- * <p>
- * This operator also affects the
- * partitioning of the stream, by forcing values with the same key to go to
- * the same processing instance.
- *
- * @param fields
- * One or more field expressions on which the DataStream will be
- * grouped.
- * @return The grouped {@link DataStream}
- **/
- public GroupedDataStream<T, Tuple> groupBy(String... fields) {
- return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
- }
-
- /**
- * Groups the elements of a {@link DataStream} by the key extracted by the
- * {@link KeySelector} to be used with grouped operators like
- * {@link GroupedDataStream#reduce(org.apache.flink.api.common.functions.ReduceFunction)}.
- *
- * <p/>
- * This operator also affects the partitioning of the stream, by forcing
- * values with the same key to go to the same processing instance.
- *
- * @param keySelector
- * The {@link KeySelector} that will be used to extract keys for
- * the values
- * @return The grouped {@link DataStream}
- */
- public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> keySelector) {
- return new GroupedDataStream<T, K>(this, clean(keySelector));
- }
-
- private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) {
- return new GroupedDataStream<T, Tuple>(this,
- clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
- }
/**
* Sets the partitioning of the {@link DataStream} so that the output is
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 5893295..18c2cee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -66,10 +66,10 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
protected boolean isPartitioned = false;
protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream,
- KeySelector<OUT, ?> groupByKey, WindowTransformation tranformation,
+ KeySelector<OUT, ?> keyByKey, WindowTransformation tranformation,
boolean isPartitioned) {
super();
- this.groupByKey = groupByKey;
+ this.keyByKey = keyByKey;
this.discretizedStream = discretizedStream;
this.transformation = tranformation;
this.isPartitioned = isPartitioned;
@@ -151,8 +151,8 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
: new ParallelMerge<OUT>(reduceFunction);
return reduced.discretizedStream
- .groupBy(new WindowKey<OUT>())
- .connect(numOfParts.groupBy(0))
+ .keyBy(new WindowKey<OUT>())
+ .connect(numOfParts.keyBy(0))
.transform(
"CoFlatMap",
reduced.discretizedStream.getType(),
@@ -218,9 +218,9 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
if (isGrouped()) {
DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
- new WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism);
+ new WindowPartitioner<OUT>(keyByKey)).setParallelism(parallelism);
- out.groupByKey = null;
+ out.keyByKey = null;
out.isPartitioned = true;
return out;
@@ -247,7 +247,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
// Only merge partitioned streams
if (isPartitioned) {
return wrap(
- discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger",
+ discretizedStream.keyBy(new WindowKey<OUT>()).transform("Window Merger",
type, new WindowMerger<OUT>()).setParallelism(discretizedStream.getParallelism()), false);
} else {
return this;
@@ -258,14 +258,14 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
@SuppressWarnings("unchecked")
private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
boolean isPartitioned) {
- return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
+ return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
transformation, isPartitioned);
}
@SuppressWarnings("unchecked")
private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
WindowTransformation transformation) {
- return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
+ return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
transformation, isPartitioned);
}
@@ -329,7 +329,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
}
protected DiscretizedStream<OUT> copy() {
- return new DiscretizedStream<OUT>(discretizedStream, groupByKey, transformation, isPartitioned);
+ return new DiscretizedStream<OUT>(discretizedStream, keyByKey, transformation, isPartitioned);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
deleted file mode 100644
index ebaeb56..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.operators.StreamGroupedFold;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-
-/**
- * A GroupedDataStream represents a {@link DataStream} which has been
- * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
- * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
- * get additional functionality by the grouping.
- *
- * @param <T> The type of the elements in the Grouped Stream.
- * @param <KEY> The type of the key in the Keyed Stream.
- */
-public class GroupedDataStream<T, KEY> extends KeyedStream<T, KEY> {
-
- /**
- * Creates a new {@link GroupedDataStream}, group inclusion is determined using
- * a {@link KeySelector} on the elements of the {@link DataStream}.
- *
- * @param dataStream Base stream of data
- * @param keySelector Function for determining group inclusion
- */
- public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
- super(dataStream, keySelector);
- }
-
-
- /**
- * Applies a reduce transformation on the grouped data stream grouped on by
- * the given key position. The {@link ReduceFunction} will receive input
- * values based on the key value. Only input values with the same key will
- * go to the same reducer.
- *
- * @param reducer
- * The {@link ReduceFunction} that will be called for every
- * element of the input values with the same key.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
- return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
- clean(reducer), keySelector));
- }
-
- /**
- * Applies a fold transformation on the grouped data stream grouped on by
- * the given key position. The {@link FoldFunction} will receive input
- * values based on the key value. Only input values with the same key will
- * go to the same folder.
- *
- * @param folder
- * The {@link FoldFunction} that will be called for every element
- * of the input values with the same key.
- * @param initialValue
- * The initialValue passed to the folders for each key.
- * @return The transformed DataStream.
- */
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
-
- TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
- Utils.getCallLocationName(), true);
-
- return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
- keySelector, initialValue));
- }
-
- /**
- * Applies an aggregation that gives a rolling sum of the data stream at the
- * given position grouped by the given key. An independent aggregate is kept
- * per key.
- *
- * @param positionToSum
- * The position in the data point to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
- return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current sum of the pojo data
- * stream at the given field expressionby the given key. An independent
- * aggregate is kept per key. A field expression is either the name of a
- * public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(String field) {
- return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current minimum of the data
- * stream at the given position by the given key. An independent aggregate
- * is kept per key.
- *
- * @param positionToMin
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
- return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current minimum of the pojo
- * data stream at the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(String field) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
- false, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the current maximum of the data stream
- * at the given position by the given key. An independent aggregate is kept
- * per key.
- *
- * @param positionToMax
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
- return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current maximum of the pojo
- * data stream at the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(String field) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
- false, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current minimum element of the
- * pojo data stream by the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY,
- first, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current maximum element of the
- * pojo data stream by the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY,
- first, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * minimum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the minimum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * minimum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the minimum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * minimum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the minimum value at the
- * given position, the operator returns either the first or last one,
- * depending on the parameter set.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @param first
- * If true, then the operator return the first element with the
- * minimal value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * maximum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the maximum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * maximum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the maximum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * maximum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the maximum value at the
- * given position, the operator returns either the first or last one,
- * depending on the parameter set.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize.
- * @param first
- * If true, then the operator return the first element with the
- * maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
- getExecutionConfig()));
- }
-
- protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
- StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector);
- return transform("Grouped Aggregation", getType(), operator);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 2fe3848..75216ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -188,20 +188,17 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, Iterat
"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
@Override
- public ConnectedStreams<I, F> groupBy(int keyPosition1, int keyPosition2) {throw groupingException;}
-
- @Override
- public ConnectedStreams<I, F> groupBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-
+ public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
@Override
- public ConnectedStreams<I, F> groupBy(String field1, String field2) {throw groupingException;}
-
+ public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
+
@Override
- public ConnectedStreams<I, F> groupBy(String[] fields1, String[] fields2) {throw groupingException;}
-
+ public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
+
@Override
- public ConnectedStreams<I, F> groupBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
-
+ public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+
@Override
public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index b3cfb55..265886b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,10 +17,19 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -39,9 +48,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
* A {@code KeyedStream} represents a {@link DataStream} on which operator state is
* partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
* {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
- * partitioning methods such as shuffle, forward and groupBy.
- *
- *
+ * partitioning methods such as shuffle, forward and keyBy.
+ *
+ * <p>
+ * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
+ * that have the same key.
+ *
* @param <T> The type of the elements in the Keyed Stream.
* @param <KEY> The type of the key in the Keyed Stream.
*/
@@ -59,7 +71,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* Function for determining state partitions
*/
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
- super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
+ super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
}
@@ -157,4 +169,281 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
+
+ // ------------------------------------------------------------------------
+ // Non-Windowed aggregation operations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce transformation on the grouped data stream grouped on by
+ * the given key position. The {@link ReduceFunction} will receive input
+ * values based on the key value. Only input values with the same key will
+ * go to the same reducer.
+ *
+ * @param reducer
+ * The {@link ReduceFunction} that will be called for every
+ * element of the input values with the same key.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
+ return transform("Keyed Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector));
+ }
+
+ /**
+ * Applies a fold transformation on the grouped data stream grouped on by
+ * the given key position. The {@link FoldFunction} will receive input
+ * values based on the key value. Only input values with the same key will
+ * go to the same folder.
+ *
+ * @param folder
+ * The {@link FoldFunction} that will be called for every element
+ * of the input values with the same key.
+ * @param initialValue
+ * The initialValue passed to the folders for each key.
+ * @return The transformed DataStream.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
+
+ TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
+ Utils.getCallLocationName(), true);
+
+ return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder),
+ keySelector, initialValue));
+ }
+
+ /**
+ * Applies an aggregation that gives a rolling sum of the data stream at the
+ * given position grouped by the given key. An independent aggregate is kept
+ * per key.
+ *
+ * @param positionToSum
+ * The position in the data point to sum
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+ return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current sum of the pojo data
+ * stream at the given field expressionby the given key. An independent
+ * aggregate is kept per key. A field expression is either the name of a
+ * public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> sum(String field) {
+ return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current minimum of the data
+ * stream at the given position by the given key. An independent aggregate
+ * is kept per key.
+ *
+ * @param positionToMin
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
+ getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current minimum of the pojo
+ * data stream at the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> min(String field) {
+ return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
+ false, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the current maximum of the data stream
+ * at the given position by the given key. An independent aggregate is kept
+ * per key.
+ *
+ * @param positionToMax
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
+ getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current maximum of the pojo
+ * data stream at the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> max(String field) {
+ return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
+ false, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current minimum element of the
+ * pojo data stream by the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @param first
+ * If True then in case of field equality the first object will
+ * be returned
+ * @return The transformed DataStream.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
+ first, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current maximum element of the
+ * pojo data stream by the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @param first
+ * If True then in case of field equality the first object will
+ * be returned
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
+ first, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * minimum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the minimum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * minimum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the minimum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * minimum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the minimum value at the
+ * given position, the operator returns either the first or last one,
+ * depending on the parameter set.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @param first
+ * If true, then the operator return the first element with the
+ * minimal value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
+ getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * maximum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the maximum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * maximum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the maximum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * maximum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the maximum value at the
+ * given position, the operator returns either the first or last one,
+ * depending on the parameter set.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize.
+ * @param first
+ * If true, then the operator return the first element with the
+ * maximum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
+ getExecutionConfig()));
+ }
+
+ protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
+ StreamGroupedReduce<T> operator = new StreamGroupedReduce<>(clean(aggregate), keySelector);
+ return transform("Keyed Aggregation", getType(), operator);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index ef6f53b..c1c5f6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -89,7 +89,7 @@ public class WindowedDataStream<T> {
protected boolean isLocal = false;
protected KeySelector<T, ?> discretizerKey;
- protected KeySelector<T, ?> groupByKey;
+ protected KeySelector<T, ?> keyByKey;
protected WindowingHelper<T> triggerHelper;
protected WindowingHelper<T> evictionHelper;
@@ -101,8 +101,8 @@ public class WindowedDataStream<T> {
this.dataStream = dataStream;
this.triggerHelper = policyHelper;
- if (dataStream instanceof GroupedDataStream) {
- this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
+ if (dataStream instanceof KeyedStream) {
+ this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
}
}
@@ -113,15 +113,15 @@ public class WindowedDataStream<T> {
this.userTrigger = trigger;
this.userEvicter = evicter;
- if (dataStream instanceof GroupedDataStream) {
- this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
+ if (dataStream instanceof KeyedStream) {
+ this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
}
}
protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
this.dataStream = windowedDataStream.dataStream;
this.discretizerKey = windowedDataStream.discretizerKey;
- this.groupByKey = windowedDataStream.groupByKey;
+ this.keyByKey = windowedDataStream.keyByKey;
this.triggerHelper = windowedDataStream.triggerHelper;
this.evictionHelper = windowedDataStream.evictionHelper;
this.userTrigger = windowedDataStream.userTrigger;
@@ -170,11 +170,11 @@ public class WindowedDataStream<T> {
* The position of the fields to group by.
* @return The grouped {@link WindowedDataStream}
*/
- public WindowedDataStream<T> groupBy(int... fields) {
+ public WindowedDataStream<T> keyBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
- return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
+ return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
- return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
+ return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
}
@@ -194,8 +194,8 @@ public class WindowedDataStream<T> {
* The fields to group by
* @return The grouped {@link WindowedDataStream}
*/
- public WindowedDataStream<T> groupBy(String... fields) {
- return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
+ public WindowedDataStream<T> keyBy(String... fields) {
+ return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
/**
@@ -210,14 +210,14 @@ public class WindowedDataStream<T> {
* The keySelector used to extract the key for grouping.
* @return The grouped {@link WindowedDataStream}
*/
- public WindowedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
+ public WindowedDataStream<T> keyBy(KeySelector<T, ?> keySelector) {
WindowedDataStream<T> ret = this.copy();
- ret.groupByKey = keySelector;
+ ret.keyByKey = keySelector;
return ret;
}
- private WindowedDataStream<T> groupBy(Keys<T> keys) {
- return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
+ private WindowedDataStream<T> keyBy(Keys<T> keys) {
+ return keyBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
getExecutionConfig())));
}
@@ -398,7 +398,7 @@ public class WindowedDataStream<T> {
.setParallelism(parallelism)
.transform(windowBuffer.getClass().getSimpleName(),
new StreamWindowTypeInfo<T>(getType()), bufferOperator)
- .setParallelism(parallelism), groupByKey, transformation, false);
+ .setParallelism(parallelism), keyByKey, transformation, false);
}
@@ -442,8 +442,8 @@ public class WindowedDataStream<T> {
// If there is a groupby for the reduce operation we apply it before the
// discretizers, because we will forward everything afterwards to
// exploit task chaining
- if (groupByKey != null) {
- dataStream = dataStream.groupBy(groupByKey);
+ if (keyByKey != null) {
+ dataStream = dataStream.keyBy(keyByKey);
}
// We discretize the stream and call the timeReduce function of the
@@ -502,28 +502,28 @@ public class WindowedDataStream<T> {
if (transformation == WindowTransformation.REDUCEWINDOW) {
if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
if (eviction instanceof KeepAllEvictionPolicy) {
- if (groupByKey == null) {
+ if (keyByKey == null) {
return new TumblingPreReducer<T>(
(ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig())).noEvict();
} else {
return new TumblingGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), groupByKey,
+ (ReduceFunction<T>) transformation.getUDF(), keyByKey,
getType().createSerializer(getExecutionConfig())).noEvict();
}
} else {
- if (groupByKey == null) {
+ if (keyByKey == null) {
return new TumblingPreReducer<T>(
(ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()));
} else {
return new TumblingGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), groupByKey,
+ (ReduceFunction<T>) transformation.getUDF(), keyByKey,
getType().createSerializer(getExecutionConfig()));
}
}
} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
- if (groupByKey == null) {
+ if (keyByKey == null) {
return new SlidingCountPreReducer<T>(
clean((ReduceFunction<T>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()),
@@ -532,13 +532,13 @@ public class WindowedDataStream<T> {
} else {
return new SlidingCountGroupedPreReducer<T>(
clean((ReduceFunction<T>) transformation.getUDF()), dataStream
- .getType().createSerializer(getExecutionConfig()), groupByKey,
+ .getType().createSerializer(getExecutionConfig()), keyByKey,
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());
}
} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
- if (groupByKey == null) {
+ if (keyByKey == null) {
return new SlidingTimePreReducer<T>(
(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()),
@@ -547,25 +547,25 @@ public class WindowedDataStream<T> {
} else {
return new SlidingTimeGroupedPreReducer<T>(
(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
- .createSerializer(getExecutionConfig()), groupByKey,
+ .createSerializer(getExecutionConfig()), keyByKey,
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));
}
} else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
- if (groupByKey == null) {
+ if (keyByKey == null) {
return new JumpingCountPreReducer<T>(
(ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
} else {
return new JumpingCountGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
+ (ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
}
} else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
- if (groupByKey == null) {
+ if (keyByKey == null) {
return new JumpingTimePreReducer<T>(
(ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()),
@@ -573,7 +573,7 @@ public class WindowedDataStream<T> {
WindowUtils.getTimeStampWrapper(trigger));
} else {
return new JumpingTimeGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
+ (ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
@@ -845,7 +845,7 @@ public class WindowedDataStream<T> {
}
protected boolean isGrouped() {
- return groupByKey != null;
+ return keyByKey != null;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
index 999d197..4a5622d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
@@ -211,8 +211,8 @@ public class StreamJoinOperator<I1, I2> extends
// op.input1.getType(), op.input2.getType());
// return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
-// .groupBy(keys1)
-// .connect(op.input2.groupBy(keys2))
+// .keyBy(keys1)
+// .connect(op.input2.keyBy(keys2))
// .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
// op.slideInterval, op.timeStamp1, op.timeStamp2));
return null;
@@ -244,8 +244,8 @@ public class StreamJoinOperator<I1, I2> extends
// return new JoinedStream<I1, I2, OUT>(
// predicate, predicate.op.input1
-// .groupBy(predicate.keys1)
-// .connect(predicate.op.input2.groupBy(predicate.keys2))
+// .keyBy(predicate.keys1)
+// .connect(predicate.op.input2.keyBy(predicate.keys2))
// .addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
// predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 7ea1309..0f9cbe9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -59,7 +59,7 @@ public class CoStreamTest extends StreamingMultipleProgramsTestBase {
public boolean filter(Integer value) throws Exception {
return true;
}
- }).groupBy(new KeySelector<Integer, Integer>() {
+ }).keyBy(new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@@ -88,7 +88,7 @@ public class CoStreamTest extends StreamingMultipleProgramsTestBase {
public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
return true;
}
- }).disableChaining().groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+ }).disableChaining().keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 337d97b..55bf889 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
@@ -125,7 +125,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
/**
- * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionByHash} result in
+ * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionByHash} result in
* different and correct topologies. Does the some for the {@link ConnectedStreams}.
*/
@Test
@@ -138,10 +138,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
ConnectedStreams connected = src1.connect(src2);
//Testing DataStream grouping
- DataStream group1 = src1.groupBy(0);
- DataStream group2 = src1.groupBy(1, 0);
- DataStream group3 = src1.groupBy("f0");
- DataStream group4 = src1.groupBy(new FirstSelector());
+ DataStream group1 = src1.keyBy(0);
+ DataStream group2 = src1.keyBy(1, 0);
+ DataStream group3 = src1.keyBy("f0");
+ DataStream group4 = src1.keyBy(new FirstSelector());
int id1 = createDownStreamId(group1);
int id2 = createDownStreamId(group2);
@@ -153,10 +153,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id3)));
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id4)));
- assertTrue(isGrouped(group1));
- assertTrue(isGrouped(group2));
- assertTrue(isGrouped(group3));
- assertTrue(isGrouped(group4));
+ assertTrue(isKeyed(group1));
+ assertTrue(isKeyed(group2));
+ assertTrue(isKeyed(group3));
+ assertTrue(isKeyed(group4));
//Testing DataStream partitioning
DataStream partition1 = src1.partitionByHash(0);
@@ -174,10 +174,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid3)));
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid4)));
- assertFalse(isGrouped(partition1));
- assertFalse(isGrouped(partition3));
- assertFalse(isGrouped(partition2));
- assertFalse(isGrouped(partition4));
+ assertFalse(isKeyed(partition1));
+ assertFalse(isKeyed(partition3));
+ assertFalse(isKeyed(partition2));
+ assertFalse(isKeyed(partition4));
// Testing DataStream custom partitioning
Partitioner<Long> longPartitioner = new Partitioner<Long>() {
@@ -199,24 +199,24 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid2)));
assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid3)));
- assertFalse(isGrouped(customPartition1));
- assertFalse(isGrouped(customPartition3));
- assertFalse(isGrouped(customPartition4));
+ assertFalse(isKeyed(customPartition1));
+ assertFalse(isKeyed(customPartition3));
+ assertFalse(isKeyed(customPartition4));
//Testing ConnectedStreams grouping
- ConnectedStreams connectedGroup1 = connected.groupBy(0, 0);
+ ConnectedStreams connectedGroup1 = connected.keyBy(0, 0);
Integer downStreamId1 = createDownStreamId(connectedGroup1);
- ConnectedStreams connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
+ ConnectedStreams connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
Integer downStreamId2 = createDownStreamId(connectedGroup2);
- ConnectedStreams connectedGroup3 = connected.groupBy("f0", "f0");
+ ConnectedStreams connectedGroup3 = connected.keyBy("f0", "f0");
Integer downStreamId3 = createDownStreamId(connectedGroup3);
- ConnectedStreams connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
+ ConnectedStreams connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
Integer downStreamId4 = createDownStreamId(connectedGroup4);
- ConnectedStreams connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
+ ConnectedStreams connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
Integer downStreamId5 = createDownStreamId(connectedGroup5);
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
@@ -234,11 +234,11 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId5)));
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId5)));
- assertTrue(isGrouped(connectedGroup1));
- assertTrue(isGrouped(connectedGroup2));
- assertTrue(isGrouped(connectedGroup3));
- assertTrue(isGrouped(connectedGroup4));
- assertTrue(isGrouped(connectedGroup5));
+ assertTrue(isKeyed(connectedGroup1));
+ assertTrue(isKeyed(connectedGroup2));
+ assertTrue(isKeyed(connectedGroup3));
+ assertTrue(isKeyed(connectedGroup4));
+ assertTrue(isKeyed(connectedGroup5));
//Testing ConnectedStreams partitioning
ConnectedStreams connectedPartition1 = connected.partitionByHash(0, 0);
@@ -281,11 +281,11 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
connectDownStreamId5)));
- assertFalse(isGrouped(connectedPartition1));
- assertFalse(isGrouped(connectedPartition2));
- assertFalse(isGrouped(connectedPartition3));
- assertFalse(isGrouped(connectedPartition4));
- assertFalse(isGrouped(connectedPartition5));
+ assertFalse(isKeyed(connectedPartition1));
+ assertFalse(isKeyed(connectedPartition2));
+ assertFalse(isKeyed(connectedPartition3));
+ assertFalse(isKeyed(connectedPartition4));
+ assertFalse(isKeyed(connectedPartition5));
}
/**
@@ -601,8 +601,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
return dataStream.print().getTransformation().getId();
}
- private static boolean isGrouped(DataStream dataStream) {
- return dataStream instanceof GroupedDataStream;
+ private static boolean isKeyed(DataStream dataStream) {
+ return dataStream instanceof KeyedStream;
}
private static Integer createDownStreamId(ConnectedStreams dataStream) {
@@ -621,8 +621,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
return coMap.getId();
}
- private static boolean isGrouped(ConnectedStreams dataStream) {
- return (dataStream.getFirstInput() instanceof GroupedDataStream && dataStream.getSecondInput() instanceof GroupedDataStream);
+ private static boolean isKeyed(ConnectedStreams dataStream) {
+ return (dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream);
}
private static boolean isPartitioned(StreamEdge edge) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 774f58d..43371b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -401,7 +401,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
.withFeedbackType("String");
try {
- coIt.groupBy(1, 2);
+ coIt.keyBy(1, 2);
fail();
} catch (InvalidProgramException e) {
// this is expected
@@ -476,7 +476,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 2, 3)
.map(NoOpIntMap).name("ParallelizeMap");
- IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000);
+ IterativeDataStream<Integer> it = source.keyBy(key).iterate(3000);
DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
@@ -502,7 +502,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
}
});
- it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).groupBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
+ it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
env.execute();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9baadfe8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
index 11100a4..6401546 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
@@ -78,7 +78,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
- .groupBy(0)
+ .keyBy(0)
.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
@@ -146,7 +146,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
input
- .groupBy(0)
+ .keyBy(0)
.fold(
new NonSerializable(42),
new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {