You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 15:05:20 UTC

[4/4] flink git commit: [FLINK-2103] [streaming] [api-extending] Expose partitionBy to user

[FLINK-2103] [streaming] [api-extending] Expose partitionBy to user

Conflicts:
	flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java

Closes #743


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a43e0d5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a43e0d5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a43e0d5c

Branch: refs/heads/master
Commit: a43e0d5c4c84f92a3b12e4f410e803f17fa40039
Parents: bf9cc81
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu May 28 10:40:56 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 42 +++++++++++++++++++-
 1 file changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a43e0d5c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b9225c9..db1f40f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -373,17 +373,55 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned hashing on the given fields. This setting only
+	 * effects the how the outputs will be distributed between the parallel
+	 * instances of the next processing operator.
+	 *
+	 * @param fields The tuple fields that should be used for partitioning
+	 * @return The partitioned DataStream
+	 * Specifies how elements will be distributed to parallel instances of downstream operations.
+	 *
+	 */
+	public DataStream<OUT> partitionBy(int... fields) {
+		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned hashing on the given fields. This setting only
+	 * effects the how the outputs will be distributed between the parallel
+	 * instances of the next processing operator.
+	 *
+	 * @param fields The tuple fields that should be used for partitioning
+	 * @return The partitioned DataStream
+	 * Specifies how elements will be distributed to parallel instances of downstream operations.
+	 *
+	 */
+	public DataStream<OUT> partitionBy(String... fields) {
+		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
 	 * partitioned using the given {@link KeySelector}. This setting only
 	 * effects the how the outputs will be distributed between the parallel
 	 * instances of the next processing operator.
-	 * 
+	 *
 	 * @param keySelector
 	 * @return The partitioned DataStream
+	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 */
-	protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
+	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
 	}
 
+	//private helper method for partitioning
+	private DataStream<OUT> partitionBy(Keys<OUT> keys) {
+		return setConnectionType(
+				new FieldsPartitioner<OUT>(
+						clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))));
+	}
+
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are broadcasted to every parallel instance of the next component. This