You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 15:05:20 UTC
[4/4] flink git commit: [FLINK-2103] [streaming] [api-extending]
Expose partitionBy to user
[FLINK-2103] [streaming] [api-extending] Expose partitionBy to user
Conflicts:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
Closes #743
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a43e0d5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a43e0d5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a43e0d5c
Branch: refs/heads/master
Commit: a43e0d5c4c84f92a3b12e4f410e803f17fa40039
Parents: bf9cc81
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu May 28 10:40:56 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 42 +++++++++++++++++++-
1 file changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a43e0d5c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b9225c9..db1f40f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -373,17 +373,55 @@ public class DataStream<OUT> {
/**
* Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned hashing on the given fields. This setting only
+ * effects the how the outputs will be distributed between the parallel
+ * instances of the next processing operator.
+ *
+ * @param fields The tuple fields that should be used for partitioning
+ * @return The partitioned DataStream
+ * Specifies how elements will be distributed to parallel instances of downstream operations.
+ *
+ */
+ public DataStream<OUT> partitionBy(int... fields) {
+ return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned hashing on the given fields. This setting only
+ * effects the how the outputs will be distributed between the parallel
+ * instances of the next processing operator.
+ *
+ * @param fields The tuple fields that should be used for partitioning
+ * @return The partitioned DataStream
+ * Specifies how elements will be distributed to parallel instances of downstream operations.
+ *
+ */
+ public DataStream<OUT> partitionBy(String... fields) {
+ return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
* partitioned using the given {@link KeySelector}. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
- *
+ *
* @param keySelector
* @return The partitioned DataStream
+ * Specifies how elements will be distributed to parallel instances of downstream operations.
*/
- protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
+ public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
}
+ //private helper method for partitioning
+ private DataStream<OUT> partitionBy(Keys<OUT> keys) {
+ return setConnectionType(
+ new FieldsPartitioner<OUT>(
+ clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))));
+ }
+
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are broadcasted to every parallel instance of the next component. This