You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by setjet <gi...@git.apache.org> on 2017/05/25 21:12:23 UTC

[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

GitHub user setjet opened a pull request:

    https://github.com/apache/spark/pull/18113

    [SPARK-20890][SQL] Added min and max typed aggregation functions

    ## What changes were proposed in this pull request?
    Typed Min and Max functions are missing for aggregations done on dataset. These are supported for DataFrames and therefore should also be part of the DataSet API.
    
    Please note that it is OK that the min and max functions start the MR job with MAX and MIN values respectively, because only retrieved keys are returned.
    
    ## How was this patch tested?
    Added some corresponding unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/setjet/spark spark-20890

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18113
    
----
commit d7159930d10cff73fb838e51e9971e9857911a5c
Author: setjet <ru...@gmail.com>
Date:   2017-05-25T21:08:04Z

    added min and max typed aggregation functions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118812028
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -56,7 +55,6 @@ class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long])
    -
    --- End diff --
    
    nit: ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r147559942
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    OK let's follow the SQL semantic and deal with performance problem later. @setjet sorry for the late.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r119996472
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -95,7 +93,123 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    --- End diff --
    
    @cloud-fan do you agree with this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84509 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84509/testReport)** for PR 18113 at commit [`ce6a7bf`](https://github.com/apache/spark/commit/ce6a7bfc9201e50ca10290e903e859890ef4fad4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118821260
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    We can apply an aggregate on a dataset without groupBy. E.g.,
    
        val ds = Seq.empty[(Int, Int)].toDS
        ds.agg(typed.sum((x: (Int, Int))=> x._2)).show()
    
    In this case, seems this typed `TypedMinDouble` will return the initial value from the `zero` variable. I've not tried it, but looks like it will get `Double.PositiveInfinity`. Beside the fact it's inconsistent with `aggregate.Min`, getting positive infinity here can't be a correct result too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118812022
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = if (b < f(a)) b else f(a)
    --- End diff --
    
    `math.min(b, f(a))`.  Similar to other places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150388500
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,8 +26,9 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
     
    -class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    --- End diff --
    
    let's avoid this unnecessary change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154341528
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    +1 for option 1. It sounds the best among three to me too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154887281
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -17,8 +17,10 @@
     
     package org.apache.spark.sql.execution.aggregate
     
    +import java.lang
    --- End diff --
    
    usually we do `import java.lang.{Long => JLong}`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    BTW let's not change the existing type sum, it already follows mathematical standard as we expected.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84508 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84508/testReport)** for PR 18113 at commit [`e37eec7`](https://github.com/apache/spark/commit/e37eec784584800668aa46cec09052f681edb167).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84509 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84509/testReport)** for PR 18113 at commit [`ce6a7bf`](https://github.com/apache/spark/commit/ce6a7bfc9201e50ca10290e903e859890ef4fad4).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155070641
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, MutableDouble, java.lang.Double] {
    +  override def zero: MutableDouble = null
    +  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
    +    if (b == null) {
    +      new MutableDouble(f(a))
    +    } else {
    +      b.value = math.min(b.value, f(a))
    +      b
    +    }
    +  }
    +  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1.value = math.min(b1.value, b2.value)
    +      b1
    +    }
    +  }
    +  override def finish(reduction: MutableDouble): java.lang.Double = {
    +    if (reduction == null) {
    +      null
    +    } else {
    +      reduction.toJavaDouble
    +    }
    +  }
    +
    +  override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble]
    +  override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x))
    +  def toColumnScala: TypedColumn[IN, Double] = {
    --- End diff --
    
    @cloud-fan I agree that's the best option. Made some slight changes but its implemented now.
    There is one issue  however I am stuck on: the tests for empty sets ("typed aggregate: empty") seem to be casting to nulls from options, resulting into the following:
    
    Decoded objects do not match expected objects:
    expected: WrappedArray([0.0,0,NaN,None,None,None,None])
    actual:   WrappedArray([0.0,0,NaN,[null],[null],[null],[null]])
    
    This doesn't happen to non-empty data sets. Do  you have any clue?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153021722
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Since it's hard to tell if it's empty input or inputs of all `Double.PositiveInfinity`, my new proposal
    ```
    class MutableLong(var value: Long) extend Serializable
    
    class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, java.lang.Long] {
      override def zero: MutableLong = null
      override def reduce(b: MutableLong, a: IN): MutableLong = {
        if (b == null) {
          new MutableLong(f(a))
        } else {
          b.value = math.max(b.value, f(a))
          b
        }
      }
      override def finish(reduction: MutableLong): java.lang.Long = {
        if (reduction == null) {
          null
        } else {
          reduction.value
        }
      }
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391079
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153021545
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    That's correct. That was part of the discussion above. We used to init it with null, so that we could then distinguish between these cases. As you can read above, that initial proposal was tossed as it didnt meet ANSI standards.
    Another option I just realised would be to initialize it with Double.NaN, and then use that as a flag to distinguish between infinity and the initial value. Then again, that would not be supported for Longs as we cannot assign a NaN.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155068709
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -17,8 +17,10 @@
     
     package org.apache.spark.sql.execution.aggregate
     
    +import java.lang
    --- End diff --
    
    Resolved


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118840602
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Using Option as zero value for the aggregator might be an option. I don't say it doesn't make sense. I just say it seems to me we don't need a huge refactoring.
    
    The zero values the typed aggregators return for an empty dataset are mostly inconsistent with aggregate expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    An empty sets min and max are defined is -infinity and +infinity: https://en.wikipedia.org/wiki/Empty_set
    This is supported for Java doubles, but not for Longs. We could instead Long.MIN and Long.MAX values? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84508 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84508/testReport)** for PR 18113 at commit [`e37eec7`](https://github.com/apache/spark/commit/e37eec784584800668aa46cec09052f681edb167).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT] `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153024799
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Ok makes sense. What about the return ```finish``` return type? Leaving that as a java type would cause the ```this``` and ```toColumnJava``` to be flipped, creating a  ```toColumnScala``` instead.
    What about:
    ```
    override def finish(reduction: java.lang.Double): Double = reduction
    ```
    As its on the finish, it shouldn't cause much performance overhead as its not execution many times. It would also reduce complexity a bit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391063
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -44,8 +44,9 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou
       }
     }
     
    +class TypedSumLong[IN](val f: IN => Long)
    +  extends Aggregator[IN, Long, Long] {
    --- End diff --
    
    unnecessary change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154889195
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, MutableDouble, java.lang.Double] {
    +  override def zero: MutableDouble = null
    +  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
    +    if (b == null) {
    +      new MutableDouble(f(a))
    +    } else {
    +      b.value = math.min(b.value, f(a))
    +      b
    +    }
    +  }
    +  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1.value = math.min(b1.value, b2.value)
    +      b1
    +    }
    +  }
    +  override def finish(reduction: MutableDouble): java.lang.Double = {
    +    if (reduction == null) {
    +      null
    +    } else {
    +      reduction.toJavaDouble
    +    }
    +  }
    +
    +  override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble]
    +  override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x))
    +  def toColumnScala: TypedColumn[IN, Double] = {
    --- End diff --
    
    seems it's hard to achieve, how about
    ```
    trait TypedMinDouble[IN, OUT](val f: IN => Double) extends Aggregator[IN, MutableDouble, OUT] {
      def zero = ...
      def reduce = ...
    }
    
    class JavaTypedMinDouble[IN](val f: IN => Double) extend TypedMinDouble[IN, JDouble](f) {
      def finish: JDouble
    }
    
    class ScalaTypedMinDouble[IN](val f: IN => Double) extends TypedMinDouble[IN, Option[Double](f) {
      def finish: Option[Double]
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118841520
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Currently, I think the simplest fix is to use `java.lang.Double` as buffer and output type, so we can use `null` as zero value and make `TypedMinDouble` align with `aggregate.Min` for this corner case.
    
    I am not sure if there is any negative effect by doing this. cc @cloud-fan for comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Exactly my point. I'll return -/+ inf then for doubles only, and min/max values  for longs



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    can you check the diff and revert all the unnecessary changes? thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155551168
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
    @@ -263,6 +262,25 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
           ("a", 4), ("b", 3))
       }
     
    +  test("typed aggregate: min, max") {
    +    val ds = Seq("a" -> 1, "a" -> 3, "b" -> 4, "b" -> -4, "b" -> 0).toDS()
    +    checkDataset(
    +      ds.groupByKey(_._1).agg(
    +        typed.min(_._2), typed.minLong(_._2), typed.max(_._2), typed.maxLong(_._2)),
    +      ("a", Some(1.0), Some(1L), Some(3.0), Some(3L)),
    +      ("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
    +  }
    +
    +  test("typed aggregate: empty") {
    +    val empty = Seq.empty[(Double, Double)].toDS
    +    val f = (x: (Double, Double)) => x._2
    +    val g = (x: (Long, Long)) => x._2
    +    checkDataset(
    +      empty.agg(typed.sum(f), typed.sumLong(g), typed.avg(f),
    --- End diff --
    
    the problem is, `empty.agg` is relational aggregation, it actually calls `groupBy().agg`, so the returned type is `Row` instead of Scala objects.
    
    I just found there is no global aggregate for typed aggregation, maybe we need to add that first.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    doubles has -inf and +inf, can we use that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150381736
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = 0.0
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    As discussed  previously the boxing is needed to have appropriate return types for min/max. This of course would not be needed if we align it to the current (incorrect) return values.
     
    I have bounced back and forth between the return values multiple times now, so it might be worthwhile to have some more discussion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84472/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84175/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84507 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84507/testReport)** for PR 18113 at commit [`c197cb1`](https://github.com/apache/spark/commit/c197cb1a930db390979f9926562347802280d1ea).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154296716
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Oh, I don't know why I missed this cc. Will check though related stuff and be back soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84509/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @cloud-fan could you have a look please?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r119147237
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -95,7 +93,123 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    --- End diff --
    
    doesn't the `TypedSum` have the same problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391109
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    -  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
    +  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) =
         (b1._1 + b2._1, b1._2 + b2._2)
    -  }
    +  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
     
       override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]()
       override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
    +  def toColumnJava: TypedColumn[IN, java.lang.Double] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
    +  }
    +}
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    --- End diff --
    
    one line please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    It seems to me that the behavior of those aggregators on empty dataset might be different than `min`, `max`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150381615
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = 0.0
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    I'm -1 on this change, it introduces a lot of overhead because of boxing. Can we revert it back and use `toColumnJava`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84508/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150388516
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +77,130 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    -  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
    +  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) =
         (b1._1 + b2._1, b1._2 + b2._2)
    -  }
    +  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
     
       override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]()
       override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    +
    +  override def zero: Double = Double.MaxValue
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.MaxValue == reduction) {
    +      Double.NegativeInfinity
    +    }
    +    else {
    --- End diff --
    
    nit:
    ```
    if {
    } else {
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @setjet Are you still working on this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154172794
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Which of those 3 will we decide on then? None of them is ideal unfortunately.
    
    cc @cloud-fan @gatorsmile @HyukjinKwon @srowen
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    sounds good. I think the deal is, typed sum/count/max/min should follow mathematical standard instead of sql.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118840819
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Btw, if you mean to make the return type (`OUT`) as `Option[Double]`, I don't think it make sense indeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118798645
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala ---
    @@ -77,6 +77,34 @@ object typed {
        */
       def sumLong[IN](f: IN => Long): TypedColumn[IN, Long] = new TypedSumLong[IN](f).toColumn
     
    +  /**
    +    * Min aggregate function for floating point (double) type.
    +    *
    +    * @since 2.3.0
    +    */
    --- End diff --
    
    https://github.com/databricks/scala-style-guide#documentation-style
    
    In Spark, we don't use the ScalaDoc style. Please correct all the description by using the multi-line JavaDoc comment style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118843548
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    @viirya I just had a go at your suggestion, but it seems to be more complicated than anticipated. Spark performs some implicit casts (I think as part of Catalyst) between `java.lang.Double` and `scala.Double`, causing a nullpointer:
    `java.lang.NullPointerException at scala.Predef$.Double2double(Predef.scala:365!`
    I am not sure if this method is feasible. 
    
    Sample of the `merge` function:
    `override def merge(b1: java.lang.Double, b2: java.lang.Double): java.lang.Double =  java.lang.Math.min(b1, b2)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84472 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84472/testReport)** for PR 18113 at commit [`20216e4`](https://github.com/apache/spark/commit/20216e4cda40b75b98152954d1055cf062f818f0).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TypedMinDouble[IN](val f: IN => Double)`
      * `class TypedMaxDouble[IN](val f: IN => Double)`
      * `class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, java.lang.Long] `
      * `class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, java.lang.Long] `
      * `class MutableLong(var value: Long) extends Serializable `
      * `class MutableDouble(var value: Double) extends Serializable `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153020749
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    If the input is one `Double.PositiveInfinity`, we will get return `Double.NegativeInfinity` as result?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84507/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r119175369
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -95,7 +93,123 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    --- End diff --
    
    `TypedSum` is actually correct because it will return 0 in case of an empty set  ['In mathematics, an empty sum, or nullary sum, is a summation where the number of terms is zero. By convention,[1] the value of any empty sum of numbers is the additive identity, zero.'](https://en.wikipedia.org/wiki/Empty_sum). One could therefore argue that `Sum.scala` is  actually wrong because it returns null: `emptyTestData.agg(sum('key))`. We could either fix Sum.scala, although that might affect existing applications, or align both to return null, even though that is not technically correct.
    The same does go for `TypedAvg`, which returns Double.Nan instead of null. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118838177
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Currently seems the behavior of aggregation expressions is more reasonable. I am not sure that if we consider this corner case when we implement those aggregators.
    
    It seems to me that we can fix this inconsistency by just modifying those aggregators. And we don't need a huge refactoring for this.
    
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    So the existing typed sum follows the mathmatical standard, not sql standard. Do we have a mathmatical standard for empty max/min?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @setjet, mind updating this please?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    There is one issue however I am stuck on: the tests for empty sets ("typed aggregate: empty") seem to be casting to nulls from options, resulting into the following:
    
    Decoded objects do not match expected objects:
    expected: WrappedArray([0.0,0,NaN,None,None,None,None])
    actual: WrappedArray([0.0,0,NaN,[null],[null],[null],[null]])
    
    This doesn't happen to non-empty data sets. Does anyone have a clue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @cloud-fan done, some small white spaces remain as it formats the functions within the file consistently


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118821329
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Ah yes I you're right, let me have a look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r121761524
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    I'll change it back to make it backwards compatible. What about the new ones @rxin ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84472 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84472/testReport)** for PR 18113 at commit [`20216e4`](https://github.com/apache/spark/commit/20216e4cda40b75b98152954d1055cf062f818f0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150392424
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java ---
    @@ -74,4 +71,40 @@
       public static <T> TypedColumn<T, Long> sumLong(MapFunction<T, Long> f) {
    --- End diff --
    
    Its already there a bit higher up in the file


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391056
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java ---
    @@ -74,4 +71,40 @@
       public static <T> TypedColumn<T, Long> sumLong(MapFunction<T, Long> f) {
         return new TypedSumLong<T>(f).toColumnJava();
       }
    +
    +  /**
    +   * Min aggregate function for floating point (double) type.
    +   *
    +   * @since 2.3.0
    +   */
    +  public static <T> TypedColumn<T, Double> min(MapFunction<T, Double> f) {
    +    return new TypedMinDouble<T>(f).toColumnJava();
    +  }
    +
    +  /**
    +   * Min aggregate function for floating point (long, i.e. 64 bit integer) type.
    --- End diff --
    
    I don't think long is a floating point...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91605/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    cc @cloud-fan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155688313
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
    @@ -263,6 +262,25 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
           ("a", 4), ("b", 3))
       }
     
    +  test("typed aggregate: min, max") {
    +    val ds = Seq("a" -> 1, "a" -> 3, "b" -> 4, "b" -> -4, "b" -> 0).toDS()
    +    checkDataset(
    +      ds.groupByKey(_._1).agg(
    +        typed.min(_._2), typed.minLong(_._2), typed.max(_._2), typed.maxLong(_._2)),
    +      ("a", Some(1.0), Some(1L), Some(3.0), Some(3L)),
    +      ("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
    +  }
    +
    +  test("typed aggregate: empty") {
    +    val empty = Seq.empty[(Double, Double)].toDS
    +    val f = (x: (Double, Double)) => x._2
    +    val g = (x: (Long, Long)) => x._2
    +    checkDataset(
    +      empty.agg(typed.sum(f), typed.sumLong(g), typed.avg(f),
    --- End diff --
    
    sounds good


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @cloud-fan done, could you please have a look?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154289888
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    After some more thoughts, options 3 is not reasonable as throwing exception is not a good idea in big data, especially in the last stage of a long-running job.
    
    option 2 is weird as it doesn't follow either java/scala or SQL.
    
    Let's go with option 1.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154887545
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, MutableDouble, java.lang.Double] {
    +  override def zero: MutableDouble = null
    +  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
    +    if (b == null) {
    +      new MutableDouble(f(a))
    +    } else {
    +      b.value = math.min(b.value, f(a))
    +      b
    +    }
    +  }
    +  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1.value = math.min(b1.value, b2.value)
    +      b1
    +    }
    +  }
    +  override def finish(reduction: MutableDouble): java.lang.Double = {
    +    if (reduction == null) {
    +      null
    +    } else {
    +      reduction.toJavaDouble
    --- End diff --
    
    I think we can just return `reduction.value`, the compiler will do auto boxing for us


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154887918
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, MutableDouble, java.lang.Double] {
    +  override def zero: MutableDouble = null
    +  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
    +    if (b == null) {
    +      new MutableDouble(f(a))
    +    } else {
    +      b.value = math.min(b.value, f(a))
    +      b
    +    }
    +  }
    +  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1.value = math.min(b1.value, b2.value)
    +      b1
    +    }
    +  }
    +  override def finish(reduction: MutableDouble): java.lang.Double = {
    +    if (reduction == null) {
    +      null
    +    } else {
    +      reduction.toJavaDouble
    +    }
    +  }
    +
    +  override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble]
    +  override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x))
    +  def toColumnScala: TypedColumn[IN, Double] = {
    --- End diff --
    
    How are you going to handle null? I think we need to output `Option[Double]` for scala


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153025270
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Ah sorry yes, was only talking about the output type. 
    I don't have time right now but Ill do it over the weekend and mark you when I am done.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150388484
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java ---
    @@ -74,4 +71,40 @@
       public static <T> TypedColumn<T, Long> sumLong(MapFunction<T, Long> f) {
         return new TypedSumLong<T>(f).toColumnJava();
       }
    +
    +  /**
    +   * Min aggregate function for floating point (double) type.
    +   *
    +   * @since 2.3.0
    +   */
    +  public static <T> TypedColumn<T, Double> min(MapFunction<T, Double> f) {
    +    return new TypedMinDouble<T>(f).toColumnJava();
    +  }
    +
    +  /**
    +   * Min aggregate function for floating point (double) type.
    --- End diff --
    
    nit: long type


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153024025
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    The thing is that, using `java.lang.Long` as buffer type will introduce boxing and unboxing for every computation, while `MutableLong` doesn't.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @cloud-fan 
    Sorry i misread the conclusion of the discussion, reverted the initial api to exactly how it was before, while the new functions follow the SQL standard as you agreed on 2 weeks ago. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391105
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java ---
    @@ -74,4 +71,40 @@
       public static <T> TypedColumn<T, Long> sumLong(MapFunction<T, Long> f) {
    --- End diff --
    
    we can also add average, which is already implemented.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153024974
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    I'm ok with
    ```
      override def finish(reduction: MutableLong): Long = {
        if (reduction == null) {
          Long.MinValue
        } else {
          reduction.value
        }
      }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84175 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84175/testReport)** for PR 18113 at commit [`f4d62e9`](https://github.com/apache/spark/commit/f4d62e98730c9119fb08f9f0e4b41f8373ebf61a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r154889049
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, MutableDouble, java.lang.Double] {
    +  override def zero: MutableDouble = null
    +  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
    +    if (b == null) {
    +      new MutableDouble(f(a))
    +    } else {
    +      b.value = math.min(b.value, f(a))
    +      b
    +    }
    +  }
    +  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1.value = math.min(b1.value, b2.value)
    +      b1
    +    }
    +  }
    +  override def finish(reduction: MutableDouble): java.lang.Double = {
    +    if (reduction == null) {
    +      null
    +    } else {
    +      reduction.toJavaDouble
    --- End diff --
    
    seems it's hard to achieve, how about
    ```
    trait TypedMinDouble[IN, OUT](val f: IN => Double) extends Aggregator[IN, MutableDouble, OUT] {
      def zero = ...
      def reduce = ...
    }
    
    class JavaTypedMinDouble[IN](val f: IN => Double) extend TypedMinDouble[IN, JDouble](f) {
      def finish: JDouble
    }
    
    class ScalaTypedMinDouble[IN](val f: IN => Double) extends TypedMinDouble[IN, Option[Double](f) {
      def finish: Option[Double]
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #91605 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91605/testReport)** for PR 18113 at commit [`ce6a7bf`](https://github.com/apache/spark/commit/ce6a7bfc9201e50ca10290e903e859890ef4fad4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84507 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84507/testReport)** for PR 18113 at commit [`c197cb1`](https://github.com/apache/spark/commit/c197cb1a930db390979f9926562347802280d1ea).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait TypedMinDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT] `
      * `class JavaTypedMinDouble[IN](val f: IN => Double) extends TypedMinDouble[IN, java.lang.Double] `
      * `class ScalaTypedMinDouble[IN](val f: IN => Double) extends TypedMinDouble[IN, Option[Double]] `
      * `trait TypedMaxDouble[IN, OUT] extends Aggregator[IN, MutableDouble, OUT] `
      * `class JavaTypedMaxDouble[IN](val f: IN => Double) extends TypedMaxDouble[IN, java.lang.Double] `
      * `class ScalaTypedMaxDouble[IN](val f: IN => Double) extends TypedMaxDouble[IN, Option[Double]] `
      * `trait TypedMinLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT] `
      * `class JavaTypedMinLong[IN](val f: IN => Long) extends TypedMinLong[IN, java.lang.Long] `
      * `class ScalaTypedMinLong[IN](val f: IN => Long) extends TypedMinLong[IN, Option[Long]] `
      * `trait TypedMaxLong[IN, OUT] extends Aggregator[IN, MutableLong, OUT] `
      * `class JavaTypedMaxLong[IN](val f: IN => Long) extends TypedMaxLong[IN, java.lang.Long] `
      * `class ScalaTypedMaxLong[IN](val f: IN => Long) extends TypedMaxLong[IN, Option[Long]] `
      * `class MutableLong(var value: Long) extends Serializable`
      * `class MutableDouble(var value: Double) extends Serializable`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391130
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    -  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
    +  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) =
         (b1._1 + b2._1, b1._2 + b2._2)
    -  }
    +  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
     
       override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]()
       override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
    +  def toColumnJava: TypedColumn[IN, java.lang.Double] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
    +  }
    +}
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    +
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    +      Double.NegativeInfinity
    +    } else {
    +      reduction
    +    }
    +  }
    +
    +  override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +  override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMaxDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    --- End diff --
    
    one line please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391077
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    -  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
    +  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) =
         (b1._1 + b2._1, b1._2 + b2._2)
    -  }
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118932092
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    A modified version of existing `TypedSumDouble` to use `java.lang.Double` as `BUF` and `OUT` looks like below. It returns `null` when applying on an empty dataset.
    
        class TypedSumDouble[IN](val f: IN => Double)
              extends Aggregator[IN, java.lang.Double, java.lang.Double] {
          override def zero: java.lang.Double = null
          override def reduce(b: java.lang.Double, a: IN): java.lang.Double = {
            if (b == null) {
              f(a)
            } else {
              b + f(a)
            }
          }
          override def merge(b1: java.lang.Double, b2: java.lang.Double): java.lang.Double = {
            if (b1 == null) {
              b2
            } else if (b2 == null) {
              b1
            } else {
              b1 + b2
            }
          }
          override def finish(reduction: java.lang.Double): java.lang.Double = {
            reduction
          }
          override def bufferEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
          override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]()
    
          // Java api support
          def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x))
    
          def toColumnJava: TypedColumn[IN, java.lang.Double] = {
            toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
          }
        }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118817938
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    When applying on empty dataset, will we get `Double.PositiveInfinity`? Seems it doesn't match `aggregate.Min`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153020437
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -81,14 +77,13 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
       }
     }
     
    -
     class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
       override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
         (b1._1 + b2._1, b1._2 + b2._2)
       }
    +  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    --- End diff --
    
    switched finish and merge around to make functions consistent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153022290
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Doesn't that boil down to what was there previously? https://github.com/apache/spark/pull/18113/commits/51783b55197cea6c130722838ec97ad6df5c92be
    
    ```
      override def zero: java.lang.Double = null
      override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
        if (b == null) f(a) else math.max(b, f(a))
    
      override def merge(b1: java.lang.Double, b2: java.lang.Double): java.lang.Double = {
        if (b1 == null) {
          b2
        } else if (b2 == null) {
          b1
        } else {
          math.max(b1, b2)
        }
      }
      override def finish(reduction: java.lang.Double): java.lang.Double = reduction
    ```
    
    Here we just return null in case its an empty set or if we have the edge case you just mentioned. You rejected it because you were afraid of boxing performance on the 8th of June.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153025008
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    This is more scala-friendly, and the semantic looks reasonable as we discussed before: min of empty input is Long/Double.MinValue


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118812026
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -38,7 +38,6 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
    -
    --- End diff --
    
    nit: keep the original style (do not delete brank line)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    @gatorsmile @cloud-fan 
    Could you have a look please?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155374041
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
    @@ -263,6 +262,25 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
           ("a", 4), ("b", 3))
       }
     
    +  test("typed aggregate: min, max") {
    +    val ds = Seq("a" -> 1, "a" -> 3, "b" -> 4, "b" -> -4, "b" -> 0).toDS()
    +    checkDataset(
    +      ds.groupByKey(_._1).agg(
    +        typed.min(_._2), typed.minLong(_._2), typed.max(_._2), typed.maxLong(_._2)),
    +      ("a", Some(1.0), Some(1L), Some(3.0), Some(3L)),
    +      ("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
    +  }
    +
    +  test("typed aggregate: empty") {
    +    val empty = Seq.empty[(Double, Double)].toDS
    --- End diff --
    
    That won't change anything unfortunately. The difference between the empty and the non-empty testcases is that the latter is doing a groupbykey. If this is done on an empty dataset, no Row is returned at all and therefore doesn't allow us to verify a None is returned.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Hi, it has been a while but I can pick it back up when I have time next weekend or so if that's OK. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r120975148
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    I'm afraid this may have performance regression because of boxing. Actually I think it's fine to have the typed operations have different semantic from SQL. @rxin what do you think? Can typed sum return `0` for empty input?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118939565
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Ah  I see my misunderstanding: in reduce I tried to also have an `if` for `f(a) == null` because of the previously mentioned implicit casting issue. This would force a `java.lang.Double` to be returned by the function, as `Double == null` doesn't make sense in Scala.
    
    I have updated the code, please have a look :) Becuase `OUT` is already a `java.lang.Double`, we do not need the `toColumnJava`. As a result of `OUT` being `java.lang.Double` however, we do need a `toColumnScala` to accommodate `    val f = (x: (Double, Double)) => x._2; empty.agg(typed.min(f)).show()` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153020474
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -38,13 +38,11 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    -
    --- End diff --
    
    removed some whitelines to make functions more consistent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155133571
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
    @@ -263,6 +262,25 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
           ("a", 4), ("b", 3))
       }
     
    +  test("typed aggregate: min, max") {
    +    val ds = Seq("a" -> 1, "a" -> 3, "b" -> 4, "b" -> -4, "b" -> 0).toDS()
    +    checkDataset(
    +      ds.groupByKey(_._1).agg(
    +        typed.min(_._2), typed.minLong(_._2), typed.max(_._2), typed.maxLong(_._2)),
    +      ("a", Some(1.0), Some(1L), Some(3.0), Some(3L)),
    +      ("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
    +  }
    +
    +  test("typed aggregate: empty") {
    +    val empty = Seq.empty[(Double, Double)].toDS
    --- End diff --
    
    why don't you follow the non-empty test case and use `(String, Int)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #84175 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84175/testReport)** for PR 18113 at commit [`f4d62e9`](https://github.com/apache/spark/commit/f4d62e98730c9119fb08f9f0e4b41f8373ebf61a).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] `
      * `class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] `
      * `class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118841626
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Yes that is what I mean, sorry for the confusion. There is therefore no nice solution unfortunately. I agree using `java.lang.Double` is probably the simplest and therefore the way to go. Let's see what cloud-fan says before I update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r121025561
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    This changes the signature and is not backwards compatible. We should stick with whatever that was before.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118821077
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Hi, thanks for having a look. This is actually not an issue because on an empty dataset, nothing is returned. For more details, you could have a look a the existing tests: the 'agg' function is called on a 'KeyValueGroupedDataset' object, which is returned by the 'groupByKey'. This ensures it's only done per key.
    
    I have added an additional unit test to demonstrate.
    
    Regarding Double.PositiveInfinity, I could change it to Double.Max, to be in line with Long.Max if you'd prefer that. I personally think Infinity makes more sense, although that is inconsistent with Long.Max because Long.PositiiveInfinity is not available 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150388551
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +77,130 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    -  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
    +  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) =
         (b1._1 + b2._1, b1._2 + b2._2)
    -  }
    +  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
     
       override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]()
       override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
     
       // Java api support
       def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
       def toColumnJava: TypedColumn[IN, java.lang.Double] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    +
    +  override def zero: Double = Double.MaxValue
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.MaxValue == reduction) {
    +      Double.NegativeInfinity
    +    }
    +    else {
    +      reduction
    +    }
    +  }
    +
    +  override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +  override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double])
    +
    +  def toColumnJava: TypedColumn[IN, java.lang.Double] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
    +  }
    +}
    +
    +class TypedMaxDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, Double, Double] {
    +
    +  override def zero: Double = Double.MinValue
    --- End diff --
    
    can we use `Double.PositiveInfinity` as initial value?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r153025148
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +94,91 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = math.min(b, f(a))
    +  override def merge(b1: Double, b2: Double): Double = math.min(b1, b2)
    +  override def finish(reduction: Double): Double = {
    +    if (Double.PositiveInfinity == reduction) {
    --- End diff --
    
    Actually we have 3 choices for empty input here:
    1. return null and the return type is java.lang.Long
    2. return Long.MinValue and return type is primitive long
    3. throw exception and return type is primitive long
    
    cc @gatorsmile @HyukjinKwon @srowen 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150391065
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r121030894
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    --- End diff --
    
    yea for this one we should stick with what it was, but for new ones like typed max/min, can we have different semantic from SQL? e.g. return `Long.Min` for typed min on empty input, while SQL should return null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150392495
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -76,26 +76,126 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
     
       // Java api support
       def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any])
    +  
       def toColumnJava: TypedColumn[IN, java.lang.Long] = {
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
       }
     }
     
    +class TypedAverage[IN](val f: IN => Double)
    +  extends Aggregator[IN, (Double, Long), Double] {
     
    -class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
       override def zero: (Double, Long) = (0.0, 0L)
       override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
    -  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
    --- End diff --
    
    Order of functions is consistent among all aggregation functions: zero, reduce, merge finish.  Hence the swap of location of the merge and finish functions


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118931501
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    I don't get it. Why will using `java.lang.Double` as `BUT` and `OUT` leak internals?
    
    The signature of your `f` looks weird. Why is it `(Double, java.lang.Double) => java.lang.Double`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118840234
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Which aggregators do you mean with 'those aggregators'?
    
    Wouldn't it make more sense to put it in an Option? The whole point of DataSets is to provide proper typing.  If someone prefers the other way, they can still it by passing in a Column instead of a TypedColumn: (https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Dataset.html#agg(org.apache.spark.sql.Column,%20org.apache.spark.sql.Column...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155667834
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---
    @@ -263,6 +262,25 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
           ("a", 4), ("b", 3))
       }
     
    +  test("typed aggregate: min, max") {
    +    val ds = Seq("a" -> 1, "a" -> 3, "b" -> 4, "b" -> -4, "b" -> 0).toDS()
    +    checkDataset(
    +      ds.groupByKey(_._1).agg(
    +        typed.min(_._2), typed.minLong(_._2), typed.max(_._2), typed.maxLong(_._2)),
    +      ("a", Some(1.0), Some(1L), Some(3.0), Some(3L)),
    +      ("b", Some(-4.0), Some(-4L), Some(4.0), Some(4L)))
    +  }
    +
    +  test("typed aggregate: empty") {
    +    val empty = Seq.empty[(Double, Double)].toDS
    +    val f = (x: (Double, Double)) => x._2
    +    val g = (x: (Long, Long)) => x._2
    +    checkDataset(
    +      empty.agg(typed.sum(f), typed.sumLong(g), typed.avg(f),
    --- End diff --
    
    Yes I did not notice that. Given that its a change in core, maybe we should create a separate JIRA for that, and make this one depend on it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r155068746
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, MutableDouble, java.lang.Double] {
    +  override def zero: MutableDouble = null
    +  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
    +    if (b == null) {
    +      new MutableDouble(f(a))
    +    } else {
    +      b.value = math.min(b.value, f(a))
    +      b
    +    }
    +  }
    +  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1.value = math.min(b1.value, b2.value)
    +      b1
    +    }
    +  }
    +  override def finish(reduction: MutableDouble): java.lang.Double = {
    +    if (reduction == null) {
    +      null
    +    } else {
    +      reduction.toJavaDouble
    --- End diff --
    
    Done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118822478
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Hmm it seems that typedAvg (already implemented), returns a NaN, while aggregate.Min returns null. 
    Aligning it with typedAvg would not be possible for minLong, as NaN is only availble for Double of course. Another possibility of course would be to wrap it in Option type, but that again is not completely in line with aggregate.Min. This is because aggregate.Min is expression based, which has built in support for null as it extends aggregate.interfaces.DeclarativeAggregate, whereas typedaggregators extend Aggregator.
    Aligning this properly seems like a huge refactor. What do you think the best approach is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118856080
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    I've tried it and it works.
    
    You can't just do `min(b1, b2)` in `merge`. Because `b1` and `b2` can be null. When `b1` is null, just output `b2`, and vice versa. We only call `min` on them when they are both not null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    **[Test build #91605 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91605/testReport)** for PR 18113 at commit [`ce6a7bf`](https://github.com/apache/spark/commit/ce6a7bfc9201e50ca10290e903e859890ef4fad4).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118800681
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    +  override def reduce(b: Double, a: IN): Double = if (b < f(a)) b else f(a)
    +  override def merge(b1: Double, b2: Double): Double = if (b1 < b2) b1 else b2
    +  override def finish(reduction: Double): Double = reduction
    +
    +  override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +  override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
    +  def toColumnJava: TypedColumn[IN, java.lang.Double] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
    +  }
    +}
    +
    +class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.NegativeInfinity
    +  override def reduce(b: Double, a: IN): Double = if (b > f(a)) b else f(a)
    +  override def merge(b1: Double, b2: Double): Double = if (b1 > b2) b1 else b2
    +  override def finish(reduction: Double): Double = reduction
    +
    +  override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +  override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Double]) = this(x => f.call(x).asInstanceOf[Double])
    +  def toColumnJava: TypedColumn[IN, java.lang.Double] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
    +  }
    +}
    +
    +class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
    +  override def zero: Long = Long.MaxValue
    +  override def reduce(b: Long, a: IN): Long = if (b < f(a)) b else f(a)
    +  override def merge(b1: Long, b2: Long): Long = if (b1 < b2) b1 else b2
    +  override def finish(reduction: Long): Long = reduction
    +
    +  override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
    +  override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long])
    +  def toColumnJava: TypedColumn[IN, java.lang.Long] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
    +  }
    +}
    +
    +class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
    +  override def zero: Long = Long.MinValue
    +  override def reduce(b: Long, a: IN): Long = if (b > f(a)) b else f(a)
    +  override def merge(b1: Long, b2: Long): Long = if (b1 > b2) b1 else b2
    +  override def finish(reduction: Long): Long = reduction
    +
    +  override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
    +  override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
    +
    +  // Java api support
    +  def this(f: MapFunction[IN, java.lang.Long]) = this(x => f.call(x).asInstanceOf[Long])
    +  def toColumnJava: TypedColumn[IN, java.lang.Long] = {
    +    toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
    +  }
    +}
    --- End diff --
    
    Add a new line here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r118914424
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -99,3 +97,67 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long
         toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
       }
     }
    +
    +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    +  override def zero: Double = Double.PositiveInfinity
    --- End diff --
    
    Turns out I made a typo which caused me to miss a permutation of handling null in the parameters...
    
    Comparing both solutions (tuple with `OUT` as `java.lang.Double` vs non-tuple with both `BUF` and `OUT` as `java.lang.Double`), it seems we have the following trade-offs:
    - tuple will require more data to be shuffled around as we are adding an additional value
    - non-tuple solution requires the developer to know a bit about the internals, i.e.: 
    `val tuple = (x: (Double, Double)) => x._2
    emptyDataSet.agg(typed.min(tuple)).show()`
    `val nontuple = (x: (Double, java.lang.Double)) => x._2
    emptyDataSet.agg(typed.min(nontuple)).show()`
    
    This is because function `f` passed in into typed.min outputs a `BUF`, forcing the caller to know about it the internals.
    Given that users can always implement their own (non-tuple version) if needed, I'd argue in favor of the tupled solution beacuse  it is a bit more developer friendly. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by setjet <gi...@git.apache.org>.
Github user setjet commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Ok sounds good. What about doubles? We could return the proper mathematical defintion, but that is not consistent with Longs



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18113: [SPARK-20890][SQL] Added min and max typed aggregation f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18113
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18113: [SPARK-20890][SQL] Added min and max typed aggreg...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18113#discussion_r150083215
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala ---
    @@ -26,43 +26,64 @@ import org.apache.spark.sql.expressions.Aggregator
     // This file defines internal implementations for aggregators.
     ////////////////////////////////////////////////////////////////////////////////////////////////////
     
    +class TypedSumDouble[IN](val f: IN => Double)
    +  extends Aggregator[IN, java.lang.Double, java.lang.Double] {
    +
    +  override def zero: java.lang.Double = null
    +  override def reduce(b: java.lang.Double, a: IN): java.lang.Double =
    +    if (b == null) f(a) else b + f(a)
    +
    +  override def merge(b1: java.lang.Double, b2: java.lang.Double): java.lang.Double = {
    +    if (b1 == null) {
    +      b2
    +    } else if (b2 == null) {
    +      b1
    +    } else {
    +      b1 + b2
    +    }
    +  }
    +  override def finish(reduction: java.lang.Double): java.lang.Double = reduction
     
    -class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
    -  override def zero: Double = 0.0
    --- End diff --
    
    so we will return 0.0 for empty input, let's not change this behavior


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org