You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/01/21 18:53:12 UTC

flink git commit: [FLINK-1367] [scala] [streaming] Field aggregations added to streaming scala api

Repository: flink
Updated Branches:
  refs/heads/release-0.8 8c8d86aaf -> eacbccf48


[FLINK-1367] [scala] [streaming] Field aggregations added to streaming scala api


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eacbccf4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eacbccf4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eacbccf4

Branch: refs/heads/release-0.8
Commit: eacbccf4824e28f8930b16767f9ec2f6296187c7
Parents: 8c8d86a
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Jan 7 16:30:09 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jan 21 16:09:23 2015 +0100

----------------------------------------------------------------------
 .../examples/windowing/TopSpeedWindowing.scala  |  2 +-
 .../flink/streaming/api/scala/DataStream.scala  | 65 ++++++++++++++++----
 .../api/scala/WindowedDataStream.scala          | 53 ++++++++++++++--
 .../flink/streaming/api/scala/package.scala     | 20 ++++++
 4 files changed, 120 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eacbccf4/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 7e9a8b2..333c950 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -50,7 +50,7 @@ object TopSpeedWindowing {
       .window(Time.of(evictionSec, SECONDS))
       .every(Delta.of[CarEvent](triggerMeters,
           (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
-      .reduce((x, y) => if (x.speed > y.speed) x else y)
+      .maxBy("speed")
 
     cars print
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eacbccf4/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 520ce68..5fcf15f 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -138,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Sets the partitioning of the DataStream so that the output values all go to 
    * the first instance of the next processing operator. Use this setting with care
-   * since it might cause a serious performance bottlenect in the application.
+   * since it might cause a serious performance bottleneck in the application.
    */
   def global: DataStream[T] = javaStream.global()
 
@@ -203,39 +203,78 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
+  
+  /**
+   * Applies an aggregation that that gives the current maximum of the data stream at
+   * the given field.
+   *
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+  
   /**
    * Applies an aggregation that that gives the current minimum of the data stream at
    * the given position.
    *
    */
   def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+  
+  /**
+   * Applies an aggregation that that gives the current minimum of the data stream at
+   * the given field.
+   *
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
 
   /**
    * Applies an aggregation that sums the data stream at the given position.
    *
    */
   def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+  
+  /**
+   * Applies an aggregation that sums the data stream at the given field.
+   *
+   */
+  def sum(field: String): DataStream[T] =  aggregate(AggregationType.SUM, field)
 
   /**
    * Applies an aggregation that that gives the current minimum element of the data stream by
-   * the given position. When equality, the user can set to get the first or last element with
-   * the minimal value.
+   * the given position. When equality, the first element is returned with the minimal value.
+   *
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType
+    .MINBY, position)
+    
+   /**
+   * Applies an aggregation that that gives the current minimum element of the data stream by
+   * the given field. When equality, the first element is returned with the minimal value.
    *
    */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType
-    .MINBY, position, first)
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType
+    .MINBY, field )
 
-  /**
+   /**
+   * Applies an aggregation that that gives the current maximum element of the data stream by
+   * the given position. When equality, the first element is returned with the maximal value.
+   *
+   */
+  def maxBy(position: Int): DataStream[T] =
+    aggregate(AggregationType.MAXBY, position)
+    
+   /**
    * Applies an aggregation that that gives the current maximum element of the data stream by
-   * the given position. When equality, the user can set to get the first or last element with
-   * the maximal value.
+   * the given field. When equality, the first element is returned with the maximal value.
    *
    */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] =
-    aggregate(AggregationType.MAXBY, position, first)
+  def maxBy(field: String): DataStream[T] =
+    aggregate(AggregationType.MAXBY, field)
+    
+  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
 
-  private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
+  private def aggregate(aggregationType: AggregationType, position: Int):
     DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaStream[Product]]
@@ -246,7 +285,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val reducer = aggregationType match {
       case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
         getTypeClass()));
-      case _ => new agg.ProductComparableAggregator(aggregationType, first)
+      case _ => new agg.ProductComparableAggregator(aggregationType, true)
     }
 
     val invokable = jStream match {

http://git-wip-us.apache.org/repos/asf/flink/blob/eacbccf4/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 8f6fcbb..211ad9f 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -157,6 +157,13 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    *
    */
   def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
+  
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the window at
+   * the given field.
+   *
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
 
   /**
    * Applies an aggregation that that gives the minimum of the elements in the window at
@@ -164,30 +171,64 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    *
    */
   def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
+  
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the window at
+   * the given field.
+   *
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
 
   /**
    * Applies an aggregation that sums the elements in the window at the given position.
    *
    */
   def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
+  
+  /**
+   * Applies an aggregation that sums the elements in the window at the given field.
+   *
+   */
+  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
 
   /**
    * Applies an aggregation that that gives the maximum element of the window by
    * the given position. When equality, returns the first.
    *
    */
-  def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
-    position, first)
+  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position)
+    
+  /**
+   * Applies an aggregation that that gives the maximum element of the window by
+   * the given field. When equality, returns the first.
+   *
+   */
+  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+    field)
 
   /**
    * Applies an aggregation that that gives the minimum element of the window by
    * the given position. When equality, returns the first.
    *
    */
-  def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
-    position, first)
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+    position)
+    
+   /**
+   * Applies an aggregation that that gives the minimum element of the window by
+   * the given field. When equality, returns the first.
+   *
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+    field)
+    
+  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
+    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }  
 
-  def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
+  def aggregate(aggregationType: AggregationType, position: Int):
   DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
@@ -198,7 +239,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     val reducer = aggregationType match {
       case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
         outType.getTypeAt(position).getTypeClass()));
-      case _ => new agg.ProductComparableAggregator(aggregationType, first)
+      case _ => new agg.ProductComparableAggregator(aggregationType, true)
     }
 
     new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]

http://git-wip-us.apache.org/repos/asf/flink/blob/eacbccf4/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index f4c6bcf..9d7c36e 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -43,4 +43,24 @@ package object scala {
 
   implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): 
   ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+
+   private[flink] def fieldNames2Indices(
+      typeInfo: TypeInformation[_],
+      fields: Array[String]): Array[Int] = {
+    typeInfo match {
+      case ti: CaseClassTypeInfo[_] =>
+        val result = ti.getFieldIndices(fields)
+
+        if (result.contains(-1)) {
+          throw new IllegalArgumentException("Fields '" + fields.mkString(", ") +
+            "' are not valid for '" + ti.toString + "'.")
+        }
+
+        result
+
+      case _ =>
+        throw new UnsupportedOperationException("Specifying fields by name is only" +
+          "supported on Case Classes (for now).")
+    }
+  }
 }