You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 15:05:18 UTC

[2/4] flink git commit: [FLINK-2103] [streaming] [api-extending] PartitionBy for connected streams, scala & docs

[FLINK-2103] [streaming] [api-extending] PartitionBy for connected streams, scala & docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b28bdf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b28bdf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b28bdf3

Branch: refs/heads/master
Commit: 6b28bdf359ba33cf106b96cf42fac0a431935330
Parents: a43e0d5
Author: mbalassi <mb...@apache.org>
Authored: Tue Jun 2 10:49:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  4 +-
 .../api/datastream/ConnectedDataStream.java     | 92 +++++++++++++++++++-
 .../streaming/api/datastream/DataStream.java    |  8 +-
 .../api/datastream/WindowedDataStream.java      |  2 +-
 .../api/scala/ConnectedDataStream.scala         | 88 ++++++++++++++++++-
 .../flink/streaming/api/scala/DataStream.scala  | 36 ++++++--
 6 files changed, 213 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 543499c..e041e45 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -282,7 +282,9 @@ Usage: `dataStream.forward()`
 Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
+ * *Field/Key Partitioning*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
+Usage: `dataStream.partitionBy(fields…)`
+* *Field/Key Grouping*: Field/Key grouping takes partitioning one step further and seperates the elements to disjoint groups based on the hash code. These groups are processed separately by the next downstream operator. 
 Usage: `dataStream.groupBy(fields…)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 02db538..35418e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -145,7 +145,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @param keyPosition2
 	 *            The field used to compute the hashcode of the elements in the
 	 *            second input stream.
-	 * @return @return The transformed {@link ConnectedDataStream}
+	 * @return The grouped {@link ConnectedDataStream}
 	 */
 	public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
 		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
@@ -162,7 +162,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The fields used to group the first input stream.
 	 * @param keyPositions2
 	 *            The fields used to group the second input stream.
-	 * @return @return The transformed {@link ConnectedDataStream}
+	 * @return The grouped {@link ConnectedDataStream}
 	 */
 	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
 		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
@@ -175,7 +175,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * expression is either the name of a public field or a getter method with
 	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
 	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 * 
+	 *
 	 * @param field1
 	 *            The grouping expression for the first input
 	 * @param field2
@@ -216,7 +216,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The {@link KeySelector} used for grouping the first input
 	 * @param keySelector2
 	 *            The {@link KeySelector} used for grouping the second input
-	 * @return @return The transformed {@link ConnectedDataStream}
+	 * @return The partitioned {@link ConnectedDataStream}
 	 */
 	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
 			KeySelector<IN2, ?> keySelector2) {
@@ -225,6 +225,90 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2.
+	 *
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPosition1),
+				dataStream2.partitionBy(keyPosition2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2.
+	 *
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPositions1),
+				dataStream2.partitionBy(keyPositions2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to field1 and field2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param field1
+	 *            The partitioning expressions for the first input
+	 * @param field2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(String field1, String field2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(field1),
+				dataStream2.partitionBy(field2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param fields1
+	 *            The partitioning expressions for the first input
+	 * @param fields2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(String[] fields1, String[] fields2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(fields1),
+				dataStream2.partitionBy(fields2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 using keySelector1 and keySelector2.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for partitioning the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for partitioning the second input
+	 * @return @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(KeySelector<IN1, ?> keySelector1,
+												 KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keySelector1),
+				dataStream2.partitionBy(keySelector2));
+	}
+
+	/**
 	 * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
 	 * the output to a common type. The transformation calls a
 	 * {@link CoMapFunction#map1} for each element of the first input and

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index db1f40f..7abd327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -344,9 +344,7 @@ public class DataStream<OUT> {
 	 * @return The grouped {@link DataStream}
 	 **/
 	public GroupedDataStream<OUT> groupBy(String... fields) {
-
 		return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
-
 	}
 
 	/**
@@ -383,7 +381,11 @@ public class DataStream<OUT> {
 	 *
 	 */
 	public DataStream<OUT> partitionBy(int... fields) {
-		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
+			return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+		} else {
+			return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 5d769ca..dd9af2f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -436,7 +436,7 @@ public class WindowedDataStream<OUT> {
 				.with(clean(reduceFunction));
 
 		// We get the windowbuffer and set it to emit empty windows with
-		// sequential IDs. This logic is necessarry to merge windows created in
+		// sequential IDs. This logic is necessary to merge windows created in
 		// parallel.
 		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index 47d8fd2..ec2c6cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -233,7 +233,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for grouping the first input
    * @param fun2
    * The function used for grouping the second input
-   * @return @return The transformed { @link ConnectedDataStream}
+   * @return The grouped { @link ConnectedDataStream}
    */
   def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
   ConnectedDataStream[IN1, IN2] = {
@@ -249,6 +249,92 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 according to keyPosition1 and keyPosition2.
+   *
+   * @param keyPosition1
+   * The field used to compute the hashcode of the elements in the
+   * first input stream.
+   * @param keyPosition2
+   * The field used to compute the hashcode of the elements in the
+   * second input stream.
+   * @return The transformed { @link ConnectedDataStream}
+   */
+  def partitionBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(keyPosition1, keyPosition2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 according to keyPositions1 and keyPositions2.
+   *
+   * @param keyPositions1
+   * The fields used to partition the first input stream.
+   * @param keyPositions2
+   * The fields used to partition the second input stream.
+   * @return The transformed { @link ConnectedDataStream}
+   */
+  def partitionBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(keyPositions1, keyPositions2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream using key expressions. Partitions
+   * the elements of input1 and input2 according to field1 and field2. A field
+   * expression is either the name of a public field or a getter method with
+   * parentheses of the {@link DataStream}S underlying type. A dot can be used
+   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   *
+   * @param field1
+   * The partitioning expression for the first input
+   * @param field2
+   * The partitioning expression for the second input
+   * @return The grouped { @link ConnectedDataStream}
+   */
+  def partitionBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(field1, field2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream using key expressions. Partitions
+   * the elements of input1 and input2 according to fields1 and fields2.
+   *
+   * @param fields1
+   * The partitioning expressions for the first input
+   * @param fields2
+   * The partitioning expressions for the second input
+   * @return The partitioned { @link ConnectedDataStream}
+   */
+  def partitionBy(fields1: Array[String], fields2: Array[String]):
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(fields1, fields2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 using fun1 and fun2.
+   *
+   * @param fun1
+   * The function used for partitioning the first input
+   * @param fun2
+   * The function used for partitioning the second input
+   * @return The partitioned { @link ConnectedDataStream}
+   */
+  def partitionBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  ConnectedDataStream[IN1, IN2] = {
+
+    val keyExtractor1 = new KeySelector[IN1, K] {
+      def getKey(in: IN1) = clean(fun1)(in)
+    }
+    val keyExtractor2 = new KeySelector[IN2, L] {
+      def getKey(in: IN2) = clean(fun2)(in)
+    }
+
+    javaStream.partitionBy(keyExtractor1, keyExtractor2)
+  }
+
+  /**
    * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
    * the outputs to a common type. If the {@link ConnectedDataStream} is
    * batched or windowed then the reduce transformation is applied on every

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c5b101e..92304ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -198,30 +198,26 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new ConnectedDataStream by connecting
    * DataStream outputs of different type with each other. The
    * DataStreams connected using this operators can be used with CoFunctions.
-   *
    */
   def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
     javaStream.connect(dataStream.getJavaStream)
 
   /**
    * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
+   * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
+   * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
    javaStream.groupBy(firstField +: otherFields.toArray: _*)   
   
   /**
    * Groups the elements of a DataStream by the given K key to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
+   * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
@@ -233,6 +229,32 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def partitionBy(fields: Int*): DataStream[T] = javaStream.partitionBy(fields: _*)
+
+  /**
+   * Groups the elements of a DataStream by the given field expressions to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
+    javaStream.partitionBy(firstField +: otherFields.toArray: _*)
+
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.partitionBy(keyExtractor)
+  }
+
+  /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are broadcasted to every parallel instance of the next component. This
    * setting only effects the how the outputs will be distributed between the