You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 15:05:18 UTC
[2/4] flink git commit: [FLINK-2103] [streaming] [api-extending]
PartitionBy for connected streams, scala & docs
[FLINK-2103] [streaming] [api-extending] PartitionBy for connected streams, scala & docs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b28bdf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b28bdf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b28bdf3
Branch: refs/heads/master
Commit: 6b28bdf359ba33cf106b96cf42fac0a431935330
Parents: a43e0d5
Author: mbalassi <mb...@apache.org>
Authored: Tue Jun 2 10:49:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 4 +-
.../api/datastream/ConnectedDataStream.java | 92 +++++++++++++++++++-
.../streaming/api/datastream/DataStream.java | 8 +-
.../api/datastream/WindowedDataStream.java | 2 +-
.../api/scala/ConnectedDataStream.scala | 88 ++++++++++++++++++-
.../flink/streaming/api/scala/DataStream.scala | 36 ++++++--
6 files changed, 213 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 543499c..e041e45 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -282,7 +282,9 @@ Usage: `dataStream.forward()`
Usage: `dataStream.shuffle()`
* *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
Usage: `dataStream.distribute()`
- * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance.
+ * *Field/Key Partitioning*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance.
+Usage: `dataStream.partitionBy(fields…)`
+* *Field/Key Grouping*: Field/Key grouping takes partitioning one step further and seperates the elements to disjoint groups based on the hash code. These groups are processed separately by the next downstream operator.
Usage: `dataStream.groupBy(fields…)`
* *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
Usage: `dataStream.broadcast()`
http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 02db538..35418e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -145,7 +145,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
- * @return @return The transformed {@link ConnectedDataStream}
+ * @return The grouped {@link ConnectedDataStream}
*/
public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
@@ -162,7 +162,7 @@ public class ConnectedDataStream<IN1, IN2> {
* The fields used to group the first input stream.
* @param keyPositions2
* The fields used to group the second input stream.
- * @return @return The transformed {@link ConnectedDataStream}
+ * @return The grouped {@link ConnectedDataStream}
*/
public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
@@ -175,7 +175,7 @@ public class ConnectedDataStream<IN1, IN2> {
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
+ *
* @param field1
* The grouping expression for the first input
* @param field2
@@ -216,7 +216,7 @@ public class ConnectedDataStream<IN1, IN2> {
* The {@link KeySelector} used for grouping the first input
* @param keySelector2
* The {@link KeySelector} used for grouping the second input
- * @return @return The transformed {@link ConnectedDataStream}
+ * @return The partitioned {@link ConnectedDataStream}
*/
public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
KeySelector<IN2, ?> keySelector2) {
@@ -225,6 +225,90 @@ public class ConnectedDataStream<IN1, IN2> {
}
/**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPosition1 and keyPosition2.
+ *
+ * @param keyPosition1
+ * The field used to compute the hashcode of the elements in the
+ * first input stream.
+ * @param keyPosition2
+ * The field used to compute the hashcode of the elements in the
+ * second input stream.
+ * @return The partitioned {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPosition1),
+ dataStream2.partitionBy(keyPosition2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2.
+ *
+ * @param keyPositions1
+ * The fields used to group the first input stream.
+ * @param keyPositions2
+ * The fields used to group the second input stream.
+ * @return The partitioned {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> partitionBy(int[] keyPositions1, int[] keyPositions2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPositions1),
+ dataStream2.partitionBy(keyPositions2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to field1 and field2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}s underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ *
+ * @param field1
+ * The partitioning expressions for the first input
+ * @param field2
+ * The partitioning expressions for the second input
+ * @return The partitioned {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> partitionBy(String field1, String field2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(field1),
+ dataStream2.partitionBy(field2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to fields1 and fields2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}s underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ *
+ * @param fields1
+ * The partitioning expressions for the first input
+ * @param fields2
+ * The partitioning expressions for the second input
+ * @return The partitioned {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> partitionBy(String[] fields1, String[] fields2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(fields1),
+ dataStream2.partitionBy(fields2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 using keySelector1 and keySelector2.
+ *
+ * @param keySelector1
+ * The {@link KeySelector} used for partitioning the first input
+ * @param keySelector2
+ * The {@link KeySelector} used for partitioning the second input
+ * @return @return The partitioned {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> partitionBy(KeySelector<IN1, ?> keySelector1,
+ KeySelector<IN2, ?> keySelector2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keySelector1),
+ dataStream2.partitionBy(keySelector2));
+ }
+
+ /**
* Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
* the output to a common type. The transformation calls a
* {@link CoMapFunction#map1} for each element of the first input and
http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index db1f40f..7abd327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -344,9 +344,7 @@ public class DataStream<OUT> {
* @return The grouped {@link DataStream}
**/
public GroupedDataStream<OUT> groupBy(String... fields) {
-
return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
-
}
/**
@@ -383,7 +381,11 @@ public class DataStream<OUT> {
*
*/
public DataStream<OUT> partitionBy(int... fields) {
- return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+ if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
+ return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+ } else {
+ return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 5d769ca..dd9af2f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -436,7 +436,7 @@ public class WindowedDataStream<OUT> {
.with(clean(reduceFunction));
// We get the windowbuffer and set it to emit empty windows with
- // sequential IDs. This logic is necessarry to merge windows created in
+ // sequential IDs. This logic is necessary to merge windows created in
// parallel.
WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index 47d8fd2..ec2c6cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -233,7 +233,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The function used for grouping the first input
* @param fun2
* The function used for grouping the second input
- * @return @return The transformed { @link ConnectedDataStream}
+ * @return The grouped { @link ConnectedDataStream}
*/
def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
ConnectedDataStream[IN1, IN2] = {
@@ -249,6 +249,92 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
}
/**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPosition1 and keyPosition2.
+ *
+ * @param keyPosition1
+ * The field used to compute the hashcode of the elements in the
+ * first input stream.
+ * @param keyPosition2
+ * The field used to compute the hashcode of the elements in the
+ * second input stream.
+ * @return The transformed { @link ConnectedDataStream}
+ */
+ def partitionBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+ javaStream.partitionBy(keyPosition1, keyPosition2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2.
+ *
+ * @param keyPositions1
+ * The fields used to partition the first input stream.
+ * @param keyPositions2
+ * The fields used to partition the second input stream.
+ * @return The transformed { @link ConnectedDataStream}
+ */
+ def partitionBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+ ConnectedDataStream[IN1, IN2] = {
+ javaStream.partitionBy(keyPositions1, keyPositions2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to field1 and field2. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field1
+ * The partitioning expression for the first input
+ * @param field2
+ * The partitioning expression for the second input
+ * @return The grouped { @link ConnectedDataStream}
+ */
+ def partitionBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+ javaStream.partitionBy(field1, field2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to fields1 and fields2.
+ *
+ * @param fields1
+ * The partitioning expressions for the first input
+ * @param fields2
+ * The partitioning expressions for the second input
+ * @return The partitioned { @link ConnectedDataStream}
+ */
+ def partitionBy(fields1: Array[String], fields2: Array[String]):
+ ConnectedDataStream[IN1, IN2] = {
+ javaStream.partitionBy(fields1, fields2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 using fun1 and fun2.
+ *
+ * @param fun1
+ * The function used for partitioning the first input
+ * @param fun2
+ * The function used for partitioning the second input
+ * @return The partitioned { @link ConnectedDataStream}
+ */
+ def partitionBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+ ConnectedDataStream[IN1, IN2] = {
+
+ val keyExtractor1 = new KeySelector[IN1, K] {
+ def getKey(in: IN1) = clean(fun1)(in)
+ }
+ val keyExtractor2 = new KeySelector[IN2, L] {
+ def getKey(in: IN2) = clean(fun2)(in)
+ }
+
+ javaStream.partitionBy(keyExtractor1, keyExtractor2)
+ }
+
+ /**
* Applies a reduce transformation on a {@link ConnectedDataStream} and maps
* the outputs to a common type. If the {@link ConnectedDataStream} is
* batched or windowed then the reduce transformation is applied on every
http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c5b101e..92304ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -198,30 +198,26 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Creates a new ConnectedDataStream by connecting
* DataStream outputs of different type with each other. The
* DataStreams connected using this operators can be used with CoFunctions.
- *
*/
def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] =
javaStream.connect(dataStream.getJavaStream)
/**
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
- * be used with grouped operators like grouped reduce or grouped aggregations
- *
+ * be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
/**
* Groups the elements of a DataStream by the given field expressions to
- * be used with grouped operators like grouped reduce or grouped aggregations
- *
+ * be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy(firstField: String, otherFields: String*): DataStream[T] =
javaStream.groupBy(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of a DataStream by the given K key to
- * be used with grouped operators like grouped reduce or grouped aggregations
- *
+ * be used with grouped operators like grouped reduce or grouped aggregations.
*/
def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
@@ -233,6 +229,32 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
+ * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
+ * be used with grouped operators like grouped reduce or grouped aggregations.
+ */
+ def partitionBy(fields: Int*): DataStream[T] = javaStream.partitionBy(fields: _*)
+
+ /**
+ * Groups the elements of a DataStream by the given field expressions to
+ * be used with grouped operators like grouped reduce or grouped aggregations.
+ */
+ def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
+ javaStream.partitionBy(firstField +: otherFields.toArray: _*)
+
+ /**
+ * Groups the elements of a DataStream by the given K key to
+ * be used with grouped operators like grouped reduce or grouped aggregations.
+ */
+ def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+ val keyExtractor = new KeySelector[T, K] {
+ val cleanFun = clean(fun)
+ def getKey(in: T) = cleanFun(in)
+ }
+ javaStream.partitionBy(keyExtractor)
+ }
+
+ /**
* Sets the partitioning of the DataStream so that the output tuples
* are broadcasted to every parallel instance of the next component. This
* setting only effects the how the outputs will be distributed between the