You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/01/28 00:34:26 UTC

[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/10960

    [SPARK-12963] Improve performance of stddev/variance

    As benchmarked and discussed here: https://github.com/apache/spark/pull/10786/files#r50038294, benefits from codegen, the declarative aggregate function could be much faster than imperative one.
    
    This PR is based on #10944 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark stddev

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10960.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10960
    
----
commit b4db00675bc3c51ddf8735cace522a5d771cf7e2
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-27T07:43:40Z

    cleanup whole stage codegen

commit 70a7c7edd1988c7dd69bccc8e563c9943775bd2c
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-27T23:22:33Z

    improve stddev and variance

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177095115
  
    @nongli @rxin Could you review this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177080266
  
    **[Test build #2484 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2484/consoleFull)** for PR 10960 at commit [`7e57a1a`](https://github.com/apache/spark/commit/7e57a1ab87f27659fd11f63ee86dd073f9687dc9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178522673
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178486473
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178801022
  
    **[Test build #50572 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50572/consoleFull)** for PR 10960 at commit [`fe6fe50`](https://github.com/apache/spark/commit/fe6fe50eae2490e7669a40200c34483533d2b632).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] Improve performance of stat func...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176533334
  
    **[Test build #50322 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50322/consoleFull)** for PR 10960 at commit [`ae78e81`](https://github.com/apache/spark/commit/ae78e81f70af03a79cbfdbb22f06cc48c04781d4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177093515
  
    **[Test build #2484 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2484/consoleFull)** for PR 10960 at commit [`7e57a1a`](https://github.com/apache/spark/commit/7e57a1ab87f27659fd11f63ee86dd073f9687dc9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178733914
  
    **[Test build #50572 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50572/consoleFull)** for PR 10960 at commit [`fe6fe50`](https://github.com/apache/spark/commit/fe6fe50eae2490e7669a40200c34483533d2b632).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176068656
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50265/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178494238
  
    **[Test build #50565 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50565/consoleFull)** for PR 10960 at commit [`9b74195`](https://github.com/apache/spark/commit/9b74195237cccfe23463e737687e7e06331e769f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176315428
  
    @davies Did you get a chance to test whole-stage codegen with higher-order statistics like skewness? If it works, the cleanest solution would be changing `CentralMomentAgg` to declarative and then make all existing univariate summary statistics call it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533751
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    +      m3.left + m3.right + deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
    +        Literal(3.0) * deltaN * (n1 * m2.right - n2 * m2.left)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4.left + m4.right +
    +        deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
    +        Literal(6.0) * deltaN * deltaN * (n1 * n1 * m2.right + n2 * n2 * m2.left) +
    +        Literal(4.0) * deltaN * (n1 * m3.right - n2 * m3.left)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(bufferSize) { i =>
    -    AttributeReference(s"M$i", DoubleType)()
    +    Seq(
    +      /* count = */ n,
    +      /* avg = */ newAvg,
    +      /* m2 = */ newM2,
    +      /* m3 = */ newM3,
    +      /* m4 = */ newM4
    +    ).take(momentOrder + 1)
       }
    +}
     
    -  // Note: although this simply copies aggBufferAttributes, this common code can not be placed
    -  // in the superclass because that will lead to initialization ordering issues.
    -  override val inputAggBufferAttributes: Seq[AttributeReference] =
    -    aggBufferAttributes.map(_.newInstance())
    -
    -  // buffer offsets
    -  private[this] val nOffset = mutableAggBufferOffset
    -  private[this] val meanOffset = mutableAggBufferOffset + 1
    -  private[this] val secondMomentOffset = mutableAggBufferOffset + 2
    -  private[this] val thirdMomentOffset = mutableAggBufferOffset + 3
    -  private[this] val fourthMomentOffset = mutableAggBufferOffset + 4
    -
    -  // frequently used values for online updates
    -  private[this] var delta = 0.0
    -  private[this] var deltaN = 0.0
    -  private[this] var delta2 = 0.0
    -  private[this] var deltaN2 = 0.0
    -  private[this] var n = 0.0
    -  private[this] var mean = 0.0
    -  private[this] var m2 = 0.0
    -  private[this] var m3 = 0.0
    -  private[this] var m4 = 0.0
    +// Compute the population standard deviation of a column
    +case class StddevPop(child: Expression) extends CentralMomentAgg(child) {
    --- End diff --
    
    Btw, `StddevPop` is simply `Sqrt(VariancePop)` (and `StddevVSamp = Sqrt(VarianceSamp)`). I'm not sure whether it can help simplify the code here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178522676
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50565/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176498774
  
    @davies side note: The JIRA number is wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176443933
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50297/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176896570
  
    **[Test build #50388 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50388/consoleFull)** for PR 10960 at commit [`ab32659`](https://github.com/apache/spark/commit/ab326596c585b0f301251bb81108defbec8c9d39).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176877654
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533788
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala ---
    @@ -436,3 +436,21 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
         }
       }
     }
    +
    +/**
    +  * An expression that will print the value of child to stderr (used for debugging codegen).
    +  */
    +case class Echo(child: Expression) extends UnaryExpression {
    --- End diff --
    
    I'm not sure whether `Echo` belongs to this PR. cc @nongli @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176877660
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50384/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176382224
  
    **[Test build #50294 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50294/consoleFull)** for PR 10960 at commit [`ae83955`](https://github.com/apache/spark/commit/ae83955ea3a34e38ce55d99f741c99f1f8b2fa8f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176043209
  
    **[Test build #50265 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50265/consoleFull)** for PR 10960 at commit [`3c8d737`](https://github.com/apache/spark/commit/3c8d737d5ee3ce34dee494dc3fac3090d983775a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178448867
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-175915582
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177066672
  
    **[Test build #50432 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50432/consoleFull)** for PR 10960 at commit [`7e57a1a`](https://github.com/apache/spark/commit/7e57a1ab87f27659fd11f63ee86dd073f9687dc9).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533802
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---
    @@ -59,6 +59,55 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
         benchmark.run()
       }
     
    +  def testStddev(values: Int): Unit = {
    +
    +    val benchmark = new Benchmark("stddev", values)
    +
    +    benchmark.addCase("stddev w/o codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
    +      sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
    +    }
    +
    +    benchmark.addCase("stddev w codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
    +      sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
    +    }
    +
    +    benchmark.addCase("kurtosis w/o codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
    +      sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
    +    }
    +
    +    benchmark.addCase("kurtosis w codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
    +      sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
    +    }
    +
    +
    +    /**
    +    Using ImperativeAggregate:
    --- End diff --
    
    Add ` as implemented in Spark 1.6`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51547587
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -52,178 +50,162 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
       protected def momentOrder: Int
     
       override def children: Seq[Expression] = Seq(child)
    -
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val n = AttributeReference("n", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  private def trimHigherOrder[T](expressions: Seq[T]) = expressions.take(momentOrder + 1)
    +
    +  override val aggBufferAttributes = trimHigherOrder(Seq(n, avg, m2, m3, m4))
    +
    +  override val initialValues: Seq[Expression] =
    +    trimHigherOrder(Seq(Literal(0.0), Literal(0.0), Literal(0.0), Literal(0.0), Literal(0.0)))
    --- End diff --
    
    Sorry for some miscommunication. The previous inline comments are useful here because `Lit(0.0)` carries no information. The comments are not necessary when the variable names can clearly tell what they are. Please recover the inline comments for initial values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177066731
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50432/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178801443
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50572/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176534002
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50322/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176901260
  
    **[Test build #50386 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50386/consoleFull)** for PR 10960 at commit [`383c193`](https://github.com/apache/spark/commit/383c193715df514e018497e459a1cf08600486f5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class Corr(x: Expression, y: Expression) extends DeclarativeAggregate `
      * `abstract class Covariance(x: Expression, y: Expression) extends DeclarativeAggregate `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533776
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Corr.scala ---
    @@ -29,165 +28,95 @@ import org.apache.spark.sql.types._
      * Definition of Pearson correlation can be found at
      * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
      */
    -case class Corr(
    -    left: Expression,
    -    right: Expression,
    -    mutableAggBufferOffset: Int = 0,
    -    inputAggBufferOffset: Int = 0)
    -  extends ImperativeAggregate {
    -
    -  def this(left: Expression, right: Expression) =
    -    this(left, right, mutableAggBufferOffset = 0, inputAggBufferOffset = 0)
    -
    -  override def children: Seq[Expression] = Seq(left, right)
    +case class Corr(x: Expression, y: Expression) extends DeclarativeAggregate {
     
    +  override def children: Seq[Expression] = Seq(x, y)
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    -
       override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType)
     
    -  override def checkInputDataTypes(): TypeCheckResult = {
    -    if (left.dataType.isInstanceOf[DoubleType] && right.dataType.isInstanceOf[DoubleType]) {
    -      TypeCheckResult.TypeCheckSuccess
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val xAvg = AttributeReference("xAvg", DoubleType, nullable = false)()
    +  protected val yAvg = AttributeReference("yAvg", DoubleType, nullable = false)()
    +  protected val ck = AttributeReference("ck", DoubleType, nullable = false)()
    +  protected val xMk = AttributeReference("xMk", DoubleType, nullable = false)()
    +  protected val yMk = AttributeReference("yMk", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes: Seq[AttributeReference] = Seq(count, xAvg, yAvg, ck, xMk, yMk)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* xAvg = */ Literal(0.0),
    +    /* yAvg = */ Literal(0.0),
    +    /* ck = */ Literal(0.0),
    +    /* xMk = */ Literal(0.0),
    +    /* yMk = */ Literal(0.0)
    +  )
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val dx = x - xAvg
    +    val dxN = dx / n
    +    val dy = y - yAvg
    +    val dyN = dy / n
    +    val newXAvg = xAvg + dxN
    +    val newYAvg = yAvg + dyN
    +    val newCk = ck + dx * (dy - dyN)
    --- End diff --
    
    This is equivalent to the original update rule. However, for reference purpose, we shouldn't change it. So users can easily map the formula on the wikipedia page to the implementation here. Shall we change it back to `y - newYAvg`? Same applies to the two lines below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176383091
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50294/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533702
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    --- End diff --
    
    It might be easier to understand if we write
    
    ~~~scala
    trimHigherOrder(
      If(IsNull(child),
        Seq(count, avg, m2, m3, m4),
        Seq(n, newAvg, newM2, newM3, newM4)))
    ~~~
    
    Please let me know if it doesn't work. Btw, I guess `if (child.nullable)` is automatically checked by `IsNull` (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala#L74). Then we can remove that check to simplify the code here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176901501
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533703
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    --- End diff --
    
    The comment `/* count = */` seems unnecessary to me because it is clear from the variable name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51534054
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    --- End diff --
    
    Then shall we make `updateExpression` follow the same style?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533733
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    --- End diff --
    
    Please also remove comments when the variable names provide sufficient info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533696
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    --- End diff --
    
    Shall we call it `n`? Otherwise, we have both `count` and `n` in the code. It is not clear that `n` means `newCount`. If we rename it, we can use `n` and `newN`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176068652
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533777
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Corr.scala ---
    @@ -29,165 +28,95 @@ import org.apache.spark.sql.types._
      * Definition of Pearson correlation can be found at
      * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
      */
    -case class Corr(
    -    left: Expression,
    -    right: Expression,
    -    mutableAggBufferOffset: Int = 0,
    -    inputAggBufferOffset: Int = 0)
    -  extends ImperativeAggregate {
    -
    -  def this(left: Expression, right: Expression) =
    -    this(left, right, mutableAggBufferOffset = 0, inputAggBufferOffset = 0)
    -
    -  override def children: Seq[Expression] = Seq(left, right)
    +case class Corr(x: Expression, y: Expression) extends DeclarativeAggregate {
     
    +  override def children: Seq[Expression] = Seq(x, y)
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    -
       override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType)
     
    -  override def checkInputDataTypes(): TypeCheckResult = {
    -    if (left.dataType.isInstanceOf[DoubleType] && right.dataType.isInstanceOf[DoubleType]) {
    -      TypeCheckResult.TypeCheckSuccess
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val xAvg = AttributeReference("xAvg", DoubleType, nullable = false)()
    +  protected val yAvg = AttributeReference("yAvg", DoubleType, nullable = false)()
    +  protected val ck = AttributeReference("ck", DoubleType, nullable = false)()
    +  protected val xMk = AttributeReference("xMk", DoubleType, nullable = false)()
    +  protected val yMk = AttributeReference("yMk", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes: Seq[AttributeReference] = Seq(count, xAvg, yAvg, ck, xMk, yMk)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* xAvg = */ Literal(0.0),
    +    /* yAvg = */ Literal(0.0),
    +    /* ck = */ Literal(0.0),
    +    /* xMk = */ Literal(0.0),
    +    /* yMk = */ Literal(0.0)
    +  )
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val dx = x - xAvg
    +    val dxN = dx / n
    +    val dy = y - yAvg
    +    val dyN = dy / n
    +    val newXAvg = xAvg + dxN
    +    val newYAvg = yAvg + dyN
    +    val newCk = ck + dx * (dy - dyN)
    +    val newXMk = xMk + dx * (dx - dxN)
    +    val newYMk = yMk + dy * (dy - dyN)
    +
    +    val isNull = Or(IsNull(x), IsNull(y))
    --- End diff --
    
    Similar here. Is it the same as the following?
    
    ~~~scala
    If(IsNull(x) || IsNull(y),
      Seq(count, xAvg, yAvg, ck, xMk, yMk),
      Seq(n, newXAvg, newYAvg, newCk, newXMk, newYMk))
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-175915586
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50239/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176949999
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50388/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176949613
  
    **[Test build #50388 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50388/consoleFull)** for PR 10960 at commit [`ab32659`](https://github.com/apache/spark/commit/ab326596c585b0f301251bb81108defbec8c9d39).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178522351
  
    **[Test build #50565 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50565/consoleFull)** for PR 10960 at commit [`9b74195`](https://github.com/apache/spark/commit/9b74195237cccfe23463e737687e7e06331e769f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class PrintToStderr(child: Expression) extends UnaryExpression `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533789
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala ---
    @@ -244,7 +244,7 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
     
         // try to compile, helpful for debug
         // println(s"${CodeFormatter.format(source)}")
    -    CodeGenerator.compile(source)
    +    // CodeGenerator.compile(source)
    --- End diff --
    
    Any reason to comment this line out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533758
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    +      m3.left + m3.right + deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
    +        Literal(3.0) * deltaN * (n1 * m2.right - n2 * m2.left)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4.left + m4.right +
    +        deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
    +        Literal(6.0) * deltaN * deltaN * (n1 * n1 * m2.right + n2 * n2 * m2.left) +
    +        Literal(4.0) * deltaN * (n1 * m3.right - n2 * m3.left)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(bufferSize) { i =>
    -    AttributeReference(s"M$i", DoubleType)()
    +    Seq(
    +      /* count = */ n,
    +      /* avg = */ newAvg,
    +      /* m2 = */ newM2,
    +      /* m3 = */ newM3,
    +      /* m4 = */ newM4
    +    ).take(momentOrder + 1)
       }
    +}
     
    -  // Note: although this simply copies aggBufferAttributes, this common code can not be placed
    -  // in the superclass because that will lead to initialization ordering issues.
    -  override val inputAggBufferAttributes: Seq[AttributeReference] =
    -    aggBufferAttributes.map(_.newInstance())
    -
    -  // buffer offsets
    -  private[this] val nOffset = mutableAggBufferOffset
    -  private[this] val meanOffset = mutableAggBufferOffset + 1
    -  private[this] val secondMomentOffset = mutableAggBufferOffset + 2
    -  private[this] val thirdMomentOffset = mutableAggBufferOffset + 3
    -  private[this] val fourthMomentOffset = mutableAggBufferOffset + 4
    -
    -  // frequently used values for online updates
    -  private[this] var delta = 0.0
    -  private[this] var deltaN = 0.0
    -  private[this] var delta2 = 0.0
    -  private[this] var deltaN2 = 0.0
    -  private[this] var n = 0.0
    -  private[this] var mean = 0.0
    -  private[this] var m2 = 0.0
    -  private[this] var m3 = 0.0
    -  private[this] var m4 = 0.0
    +// Compute the population standard deviation of a column
    +case class StddevPop(child: Expression) extends CentralMomentAgg(child) {
     
    -  /**
    -   * Initialize all moments to zero.
    -   */
    -  override def initialize(buffer: MutableRow): Unit = {
    -    for (aggIndex <- 0 until bufferSize) {
    -      buffer.setDouble(mutableAggBufferOffset + aggIndex, 0.0)
    -    }
    +  override protected def momentOrder = 2
    +
    +  override val evaluateExpression: Expression = {
    +    If(EqualTo(count, Literal(0.0)), Literal.create(null, DoubleType),
    --- End diff --
    
    Using `===` instead?
    
    ~~~scala
    If(count === Literal(0.0), Literal.create(null, DoubleType), Sqrt(m2 / count))
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178442167
  
    @mengxr Thanks for reviewing this, I should had addressed all your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533775
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Corr.scala ---
    @@ -29,165 +28,95 @@ import org.apache.spark.sql.types._
      * Definition of Pearson correlation can be found at
      * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
      */
    -case class Corr(
    -    left: Expression,
    -    right: Expression,
    -    mutableAggBufferOffset: Int = 0,
    -    inputAggBufferOffset: Int = 0)
    -  extends ImperativeAggregate {
    -
    -  def this(left: Expression, right: Expression) =
    -    this(left, right, mutableAggBufferOffset = 0, inputAggBufferOffset = 0)
    -
    -  override def children: Seq[Expression] = Seq(left, right)
    +case class Corr(x: Expression, y: Expression) extends DeclarativeAggregate {
     
    +  override def children: Seq[Expression] = Seq(x, y)
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    -
       override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType)
     
    -  override def checkInputDataTypes(): TypeCheckResult = {
    -    if (left.dataType.isInstanceOf[DoubleType] && right.dataType.isInstanceOf[DoubleType]) {
    -      TypeCheckResult.TypeCheckSuccess
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val xAvg = AttributeReference("xAvg", DoubleType, nullable = false)()
    +  protected val yAvg = AttributeReference("yAvg", DoubleType, nullable = false)()
    +  protected val ck = AttributeReference("ck", DoubleType, nullable = false)()
    +  protected val xMk = AttributeReference("xMk", DoubleType, nullable = false)()
    +  protected val yMk = AttributeReference("yMk", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes: Seq[AttributeReference] = Seq(count, xAvg, yAvg, ck, xMk, yMk)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* xAvg = */ Literal(0.0),
    +    /* yAvg = */ Literal(0.0),
    +    /* ck = */ Literal(0.0),
    +    /* xMk = */ Literal(0.0),
    +    /* yMk = */ Literal(0.0)
    +  )
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    --- End diff --
    
    Same argument on `count` and `n` usage. Calling them `n` and `newN` might be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51086288
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -109,7 +109,7 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
        * Update the central moments buffer.
        */
       override def update(buffer: MutableRow, input: InternalRow): Unit = {
    -    val v = Cast(child, DoubleType).eval(input)
    --- End diff --
    
    Creating a Cast() here is very expensive


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51160846
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -125,19 +125,15 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
           mean += deltaN
           buffer.setDouble(meanOffset, mean)
     
    -      if (momentOrder >= 2) {
    -        m2 = buffer.getDouble(secondMomentOffset)
    -        m2 += delta * (delta - deltaN)
    -        buffer.setDouble(secondMomentOffset, m2)
    -      }
    +      m2 = buffer.getDouble(secondMomentOffset)
    +      m2 += delta * (delta - deltaN)
    +      buffer.setDouble(secondMomentOffset, m2)
     
    -      if (momentOrder >= 3) {
    --- End diff --
    
    Those `if` branches are important to save computation for low-order statistics. Even we won't use `CentralMomentAgg` for second-order statistics, it is still good to keep them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51534597
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    --- End diff --
    
    I understand that `updateExpression` is not necessary. This is just to make the code more consistent. For example, I figured it out why this is not necessary in `updateExpression`, then I was confused by why we use the `if` branches here. The logic would be clearer if we use `if` branches in both methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51547595
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -52,178 +50,162 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
       protected def momentOrder: Int
     
       override def children: Seq[Expression] = Seq(child)
    -
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val n = AttributeReference("n", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  private def trimHigherOrder[T](expressions: Seq[T]) = expressions.take(momentOrder + 1)
    +
    +  override val aggBufferAttributes = trimHigherOrder(Seq(n, avg, m2, m3, m4))
    +
    +  override val initialValues: Seq[Expression] =
    +    trimHigherOrder(Seq(Literal(0.0), Literal(0.0), Literal(0.0), Literal(0.0), Literal(0.0)))
    +
    +  override val updateExpressions: Seq[Expression] = {
    +    val newN = n + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / newN
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = if (momentOrder >= 3) {
    +      m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +        delta * (delta * delta2 - deltaN * deltaN2)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +    trimHigherOrder(Seq(
    +      If(IsNull(child), n, newN),
    +      If(IsNull(child), avg, newAvg),
    +      If(IsNull(child), m2, newM2),
    +      If(IsNull(child), m3, newM3),
    +      If(IsNull(child), m4, newM4)
    +    ))
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = n.left
    +    val n2 = n.right
    +    val newN = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(newN, Literal(0.0)), Literal(0.0), delta / newN)
    --- End diff --
    
    `EqualTo` => `===` (please check other occurrences) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178456182
  
    **[Test build #50560 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50560/consoleFull)** for PR 10960 at commit [`9b74195`](https://github.com/apache/spark/commit/9b74195237cccfe23463e737687e7e06331e769f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class PrintToStderr(child: Expression) extends UnaryExpression `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533807
  
    --- Diff: sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala ---
    @@ -325,6 +325,10 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
         "drop_partitions_ignore_protection",
         "protectmode",
     
    +    // Difference accuracy (we are better)
    --- End diff --
    
    Could you elaborate? For example, we can say Hive implementation has numerical stability issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51607516
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -52,178 +50,162 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
       protected def momentOrder: Int
     
       override def children: Seq[Expression] = Seq(child)
    -
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val n = AttributeReference("n", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  private def trimHigherOrder[T](expressions: Seq[T]) = expressions.take(momentOrder + 1)
    +
    +  override val aggBufferAttributes = trimHigherOrder(Seq(n, avg, m2, m3, m4))
    +
    +  override val initialValues: Seq[Expression] =
    +    trimHigherOrder(Seq(Literal(0.0), Literal(0.0), Literal(0.0), Literal(0.0), Literal(0.0)))
    --- End diff --
    
    There is no difference for these initial values, the order does not matter here. Do you still think we should keep those comments? Or should I change to use `fill()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533782
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala ---
    @@ -28,171 +26,90 @@ import org.apache.spark.sql.types._
      * When applied on empty data (i.e., count is zero), it returns NULL.
      *
      */
    -abstract class Covariance(left: Expression, right: Expression) extends ImperativeAggregate
    -    with Serializable {
    -  override def children: Seq[Expression] = Seq(left, right)
    +abstract class Covariance(x: Expression, y: Expression) extends DeclarativeAggregate {
     
    +  override def children: Seq[Expression] = Seq(x, y)
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    -
       override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType)
     
    -  override def checkInputDataTypes(): TypeCheckResult = {
    -    if (left.dataType.isInstanceOf[DoubleType] && right.dataType.isInstanceOf[DoubleType]) {
    -      TypeCheckResult.TypeCheckSuccess
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val xAvg = AttributeReference("xAvg", DoubleType, nullable = false)()
    +  protected val yAvg = AttributeReference("yAvg", DoubleType, nullable = false)()
    +  protected val ck = AttributeReference("ck", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes: Seq[AttributeReference] = Seq(count, xAvg, yAvg, ck)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* xAvg = */ Literal(0.0),
    +    /* yAvg = */ Literal(0.0),
    +    /* ck = */ Literal(0.0)
    +  )
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val dx = x - xAvg
    +    val dy = y - yAvg
    +    val dyN = dy / n
    +    val newXAvg = xAvg + dx / n
    +    val newYAvg = yAvg + dyN
    +    val newCk = ck + dx * (dy - dyN)
    +
    +    val isNull = Or(IsNull(x), IsNull(y))
    --- End diff --
    
    Ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-175911728
  
    cc @mengxr 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51534413
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    --- End diff --
    
    `updateExpression` does not need that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176533998
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178752121
  
    LGTM pending Jenkins. It is great to see 5x speedup!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176452126
  
    **[Test build #50304 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50304/consoleFull)** for PR 10960 at commit [`1b95b7c`](https://github.com/apache/spark/commit/1b95b7cde0aa1a2bc60710f3c7dfe46e24d475af).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-175914531
  
    **[Test build #50240 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50240/consoleFull)** for PR 10960 at commit [`61edd5e`](https://github.com/apache/spark/commit/61edd5e3a2c030d7387db5283eee5ada13553505).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176443931
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533698
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    --- End diff --
    
    Shall we make `.take(momentOrder + 1)` a private method (`trimHigherOrder`?) and have it documented?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533764
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    +      m3.left + m3.right + deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
    +        Literal(3.0) * deltaN * (n1 * m2.right - n2 * m2.left)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4.left + m4.right +
    +        deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
    +        Literal(6.0) * deltaN * deltaN * (n1 * n1 * m2.right + n2 * n2 * m2.left) +
    +        Literal(4.0) * deltaN * (n1 * m3.right - n2 * m3.left)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(bufferSize) { i =>
    -    AttributeReference(s"M$i", DoubleType)()
    +    Seq(
    +      /* count = */ n,
    +      /* avg = */ newAvg,
    +      /* m2 = */ newM2,
    +      /* m3 = */ newM3,
    +      /* m4 = */ newM4
    +    ).take(momentOrder + 1)
       }
    +}
     
    -  // Note: although this simply copies aggBufferAttributes, this common code can not be placed
    -  // in the superclass because that will lead to initialization ordering issues.
    -  override val inputAggBufferAttributes: Seq[AttributeReference] =
    -    aggBufferAttributes.map(_.newInstance())
    -
    -  // buffer offsets
    -  private[this] val nOffset = mutableAggBufferOffset
    -  private[this] val meanOffset = mutableAggBufferOffset + 1
    -  private[this] val secondMomentOffset = mutableAggBufferOffset + 2
    -  private[this] val thirdMomentOffset = mutableAggBufferOffset + 3
    -  private[this] val fourthMomentOffset = mutableAggBufferOffset + 4
    -
    -  // frequently used values for online updates
    -  private[this] var delta = 0.0
    -  private[this] var deltaN = 0.0
    -  private[this] var delta2 = 0.0
    -  private[this] var deltaN2 = 0.0
    -  private[this] var n = 0.0
    -  private[this] var mean = 0.0
    -  private[this] var m2 = 0.0
    -  private[this] var m3 = 0.0
    -  private[this] var m4 = 0.0
    +// Compute the population standard deviation of a column
    +case class StddevPop(child: Expression) extends CentralMomentAgg(child) {
     
    -  /**
    -   * Initialize all moments to zero.
    -   */
    -  override def initialize(buffer: MutableRow): Unit = {
    -    for (aggIndex <- 0 until bufferSize) {
    -      buffer.setDouble(mutableAggBufferOffset + aggIndex, 0.0)
    -    }
    +  override protected def momentOrder = 2
    +
    +  override val evaluateExpression: Expression = {
    +    If(EqualTo(count, Literal(0.0)), Literal.create(null, DoubleType),
    +      Sqrt(m2 / count))
       }
     
    -  /**
    -   * Update the central moments buffer.
    -   */
    -  override def update(buffer: MutableRow, input: InternalRow): Unit = {
    -    val v = Cast(child, DoubleType).eval(input)
    -    if (v != null) {
    -      val updateValue = v match {
    -        case d: Double => d
    -      }
    -
    -      n = buffer.getDouble(nOffset)
    -      mean = buffer.getDouble(meanOffset)
    -
    -      n += 1.0
    -      buffer.setDouble(nOffset, n)
    -      delta = updateValue - mean
    -      deltaN = delta / n
    -      mean += deltaN
    -      buffer.setDouble(meanOffset, mean)
    -
    -      if (momentOrder >= 2) {
    -        m2 = buffer.getDouble(secondMomentOffset)
    -        m2 += delta * (delta - deltaN)
    -        buffer.setDouble(secondMomentOffset, m2)
    -      }
    -
    -      if (momentOrder >= 3) {
    -        delta2 = delta * delta
    -        deltaN2 = deltaN * deltaN
    -        m3 = buffer.getDouble(thirdMomentOffset)
    -        m3 += -3.0 * deltaN * m2 + delta * (delta2 - deltaN2)
    -        buffer.setDouble(thirdMomentOffset, m3)
    -      }
    -
    -      if (momentOrder >= 4) {
    -        m4 = buffer.getDouble(fourthMomentOffset)
    -        m4 += -4.0 * deltaN * m3 - 6.0 * deltaN2 * m2 +
    -          delta * (delta * delta2 - deltaN * deltaN2)
    -        buffer.setDouble(fourthMomentOffset, m4)
    -      }
    -    }
    +  override def prettyName: String = "stddev_pop"
    +}
    +
    +// Compute the sample standard deviation of a column
    +case class StddevSamp(child: Expression) extends CentralMomentAgg(child) {
    +
    +  override protected def momentOrder = 2
    +
    +  override val evaluateExpression: Expression = {
    +    If(EqualTo(count, Literal(0.0)), Literal.create(null, DoubleType),
    --- End diff --
    
    ditto on `EqualTo` => `===`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176452995
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50303/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176901504
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50386/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176451450
  
    Build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176383086
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178445288
  
    **[Test build #50560 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50560/consoleFull)** for PR 10960 at commit [`9b74195`](https://github.com/apache/spark/commit/9b74195237cccfe23463e737687e7e06331e769f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176877210
  
    **[Test build #50384 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50384/consoleFull)** for PR 10960 at commit [`1086810`](https://github.com/apache/spark/commit/10868109253b354498bb32ad6e2f96d1d4971203).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class SetDatabaseCommand(databaseName: String) extends RunnableCommand `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533870
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    --- End diff --
    
    These expression require `m3.right`, it is not valid if momentOrder < 3, will add a comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533804
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---
    @@ -148,7 +197,8 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
       }
     
       test("benchmark") {
    --- End diff --
    
    Shall we add a comment and say those benchmarks are skipped in normal build, or change `test("benchmark")` to `ignore("benchmark")` to be clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176949997
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178426025
  
    @davies I made one pass. It would be nice to have a JIRA for checking query result with tolerance on numerical differences, because the result might change (though unlikely) if we merge the partial results in a different order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178753559
  
    **[Test build #50569 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50569/consoleFull)** for PR 10960 at commit [`5f98588`](https://github.com/apache/spark/commit/5f98588ed25eb8f4b94594922d8ca4c1847abca6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class PrintToStderr(child: Expression) extends UnaryExpression `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178754165
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50569/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533735
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    --- End diff --
    
    Shall we use `===` to replace `EqualTo` to make the code more readable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51534391
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala ---
    @@ -436,3 +436,21 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
         }
       }
     }
    +
    +/**
    +  * An expression that will print the value of child to stderr (used for debugging codegen).
    +  */
    +case class Echo(child: Expression) extends UnaryExpression {
    --- End diff --
    
    how is this used, or how can this be used?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51534225
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala ---
    @@ -436,3 +436,21 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
         }
       }
     }
    +
    +/**
    +  * An expression that will print the value of child to stderr (used for debugging codegen).
    +  */
    +case class Echo(child: Expression) extends UnaryExpression {
    --- End diff --
    
    This is helpful to debug generated expression, especially for these complicated functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51535734
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala ---
    @@ -436,3 +436,21 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
         }
       }
     }
    +
    +/**
    +  * An expression that will print the value of child to stderr (used for debugging codegen).
    +  */
    +case class Echo(child: Expression) extends UnaryExpression {
    --- End diff --
    
    So this is only used if you construct the expressions manually?
    
    I'd rename this to PrintToStderr. Echo means it is returning whatever you are typing in, which is not what this is about.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533886
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    +      m3.left + m3.right + deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
    +        Literal(3.0) * deltaN * (n1 * m2.right - n2 * m2.left)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4.left + m4.right +
    +        deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
    +        Literal(6.0) * deltaN * deltaN * (n1 * n1 * m2.right + n2 * n2 * m2.left) +
    +        Literal(4.0) * deltaN * (n1 * m3.right - n2 * m3.left)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(bufferSize) { i =>
    -    AttributeReference(s"M$i", DoubleType)()
    +    Seq(
    +      /* count = */ n,
    +      /* avg = */ newAvg,
    +      /* m2 = */ newM2,
    +      /* m3 = */ newM3,
    +      /* m4 = */ newM4
    +    ).take(momentOrder + 1)
    --- End diff --
    
    This could be simplified to `trimHigherOrder(Seq(n, newAvg, newM2, newM3, newM4))`, which won't reduce readability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176842635
  
    **[Test build #50384 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50384/consoleFull)** for PR 10960 at commit [`1086810`](https://github.com/apache/spark/commit/10868109253b354498bb32ad6e2f96d1d4971203).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51294182
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---
    @@ -790,7 +790,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
             """
               |SELECT corr(b, c) FROM covar_tab WHERE a = 3
             """.stripMargin),
    -      Row(null) :: Nil)
    +      Row(Double.NaN) :: Nil)
    --- End diff --
    
    This is changed to match the behavior as stddev_samp and others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533778
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Corr.scala ---
    @@ -29,165 +28,95 @@ import org.apache.spark.sql.types._
      * Definition of Pearson correlation can be found at
      * http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
      */
    -case class Corr(
    -    left: Expression,
    -    right: Expression,
    -    mutableAggBufferOffset: Int = 0,
    -    inputAggBufferOffset: Int = 0)
    -  extends ImperativeAggregate {
    -
    -  def this(left: Expression, right: Expression) =
    -    this(left, right, mutableAggBufferOffset = 0, inputAggBufferOffset = 0)
    -
    -  override def children: Seq[Expression] = Seq(left, right)
    +case class Corr(x: Expression, y: Expression) extends DeclarativeAggregate {
     
    +  override def children: Seq[Expression] = Seq(x, y)
       override def nullable: Boolean = true
    -
       override def dataType: DataType = DoubleType
    -
       override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType)
     
    -  override def checkInputDataTypes(): TypeCheckResult = {
    -    if (left.dataType.isInstanceOf[DoubleType] && right.dataType.isInstanceOf[DoubleType]) {
    -      TypeCheckResult.TypeCheckSuccess
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val xAvg = AttributeReference("xAvg", DoubleType, nullable = false)()
    +  protected val yAvg = AttributeReference("yAvg", DoubleType, nullable = false)()
    +  protected val ck = AttributeReference("ck", DoubleType, nullable = false)()
    +  protected val xMk = AttributeReference("xMk", DoubleType, nullable = false)()
    +  protected val yMk = AttributeReference("yMk", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes: Seq[AttributeReference] = Seq(count, xAvg, yAvg, ck, xMk, yMk)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* xAvg = */ Literal(0.0),
    +    /* yAvg = */ Literal(0.0),
    +    /* ck = */ Literal(0.0),
    +    /* xMk = */ Literal(0.0),
    +    /* yMk = */ Literal(0.0)
    +  )
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val dx = x - xAvg
    +    val dxN = dx / n
    +    val dy = y - yAvg
    +    val dyN = dy / n
    +    val newXAvg = xAvg + dxN
    +    val newYAvg = yAvg + dyN
    +    val newCk = ck + dx * (dy - dyN)
    +    val newXMk = xMk + dx * (dx - dxN)
    +    val newYMk = yMk + dy * (dy - dyN)
    +
    +    val isNull = Or(IsNull(x), IsNull(y))
    +    if (x.nullable || y.nullable) {
    +      Seq(
    +        /* count = */ If(isNull, count, n),
    +        /* xAvg = */ If(isNull, xAvg, newXAvg),
    +        /* yAvg = */ If(isNull, yAvg, newYAvg),
    +        /* ck = */ If(isNull, ck, newCk),
    +        /* xMk = */ If(isNull, xMk, newXMk),
    +        /* yMk = */ If(isNull, yMk, newYMk)
    +      )
         } else {
    -      TypeCheckResult.TypeCheckFailure(
    -        s"corr requires that both arguments are double type, " +
    -          s"not (${left.dataType}, ${right.dataType}).")
    +      Seq(
    +        /* count = */ n,
    +        /* xAvg = */ newXAvg,
    +        /* yAvg = */ newYAvg,
    +        /* ck = */ newCk,
    +        /* xMk = */ newXMk,
    +        /* yMk = */ newYMk
    +      )
         }
       }
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    -
    -  override def inputAggBufferAttributes: Seq[AttributeReference] = {
    -    aggBufferAttributes.map(_.newInstance())
    +  override val mergeExpressions: Seq[Expression] = {
    +
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val dx = xAvg.right - xAvg.left
    +    val dxN = If(EqualTo(n, Literal(0.0)), Literal(0.0), dx / n)
    --- End diff --
    
    `EqualTo` => `===`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178801436
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178456219
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50560/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533793
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
    @@ -198,7 +198,8 @@ case class Window(
               functions,
               ordinal,
               child.output,
    -          (expressions, schema) => newMutableProjection(expressions, schema))
    +          (expressions, schema) =>
    +            newMutableProjection(expressions, schema, subexpressionEliminationEnabled))
    --- End diff --
    
    This would definitely help the computation of summary statistics. But would `subexpressionEliminationEnabled` have other side effects? cc @nongli 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176451454
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50302/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51535532
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala ---
    @@ -436,3 +436,21 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression
         }
       }
     }
    +
    +/**
    +  * An expression that will print the value of child to stderr (used for debugging codegen).
    +  */
    +case class Echo(child: Expression) extends UnaryExpression {
    --- End diff --
    
    This could be used to print out the the result of any expression, for example
    ```
      Sqrt(Echo(a * b))
    ```
    You can see the result of `a * b`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51535488
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    +      m3.left + m3.right + deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
    +        Literal(3.0) * deltaN * (n1 * m2.right - n2 * m2.left)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4.left + m4.right +
    +        deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
    +        Literal(6.0) * deltaN * deltaN * (n1 * n1 * m2.right + n2 * n2 * m2.left) +
    +        Literal(4.0) * deltaN * (n1 * m3.right - n2 * m3.left)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(bufferSize) { i =>
    -    AttributeReference(s"M$i", DoubleType)()
    +    Seq(
    +      /* count = */ n,
    +      /* avg = */ newAvg,
    +      /* m2 = */ newM2,
    +      /* m3 = */ newM3,
    +      /* m4 = */ newM4
    +    ).take(momentOrder + 1)
       }
    +}
     
    -  // Note: although this simply copies aggBufferAttributes, this common code can not be placed
    -  // in the superclass because that will lead to initialization ordering issues.
    -  override val inputAggBufferAttributes: Seq[AttributeReference] =
    -    aggBufferAttributes.map(_.newInstance())
    -
    -  // buffer offsets
    -  private[this] val nOffset = mutableAggBufferOffset
    -  private[this] val meanOffset = mutableAggBufferOffset + 1
    -  private[this] val secondMomentOffset = mutableAggBufferOffset + 2
    -  private[this] val thirdMomentOffset = mutableAggBufferOffset + 3
    -  private[this] val fourthMomentOffset = mutableAggBufferOffset + 4
    -
    -  // frequently used values for online updates
    -  private[this] var delta = 0.0
    -  private[this] var deltaN = 0.0
    -  private[this] var delta2 = 0.0
    -  private[this] var deltaN2 = 0.0
    -  private[this] var n = 0.0
    -  private[this] var mean = 0.0
    -  private[this] var m2 = 0.0
    -  private[this] var m3 = 0.0
    -  private[this] var m4 = 0.0
    +// Compute the population standard deviation of a column
    +case class StddevPop(child: Expression) extends CentralMomentAgg(child) {
    --- End diff --
    
    Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] Improve performance of stat func...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176506009
  
    **[Test build #50322 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50322/consoleFull)** for PR 10960 at commit [`ae78e81`](https://github.com/apache/spark/commit/ae78e81f70af03a79cbfdbb22f06cc48c04781d4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178448871
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50558/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176068170
  
    **[Test build #50265 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50265/consoleFull)** for PR 10960 at commit [`3c8d737`](https://github.com/apache/spark/commit/3c8d737d5ee3ce34dee494dc3fac3090d983775a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176452990
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176383074
  
    **[Test build #50294 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50294/consoleFull)** for PR 10960 at commit [`ae83955`](https://github.com/apache/spark/commit/ae83955ea3a34e38ce55d99f741c99f1f8b2fa8f).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `abstract class CentralMomentAgg(child: Expression) extends DeclarativeAggregate `
      * `case class Kurtosis(child: Expression) extends CentralMomentAgg(child) `
      * `case class Skewness(child: Expression) extends CentralMomentAgg(child) `
      * `case class Echo(child: Expression) extends UnaryExpression `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177066729
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178392048
  
    I'm making a pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176476050
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176475892
  
    **[Test build #50304 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50304/consoleFull)** for PR 10960 at commit [`1b95b7c`](https://github.com/apache/spark/commit/1b95b7cde0aa1a2bc60710f3c7dfe46e24d475af).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51535285
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    --- End diff --
    
    The `if` here is not necessary, will be removed (compiler can remove it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533740
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    --- End diff --
    
    The `if` branch is not necessary because we will trim the result at the end. Shall we follow the approach in `updateExpression` and remove it? It is still useful to leave a comment here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178456217
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176443884
  
    **[Test build #50297 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50297/consoleFull)** for PR 10960 at commit [`1481bb4`](https://github.com/apache/spark/commit/1481bb4b632ea2d37a703e93ee6b09ff5c9fa8dd).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178758941
  
    Merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176869406
  
    **[Test build #50386 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50386/consoleFull)** for PR 10960 at commit [`383c193`](https://github.com/apache/spark/commit/383c193715df514e018497e459a1cf08600486f5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51534252
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala ---
    @@ -244,7 +244,7 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
     
         // try to compile, helpful for debug
         // println(s"${CodeFormatter.format(source)}")
    -    CodeGenerator.compile(source)
    +    // CodeGenerator.compile(source)
    --- End diff --
    
    no reason, will bring it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stddev/va...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176436538
  
    **[Test build #50297 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50297/consoleFull)** for PR 10960 at commit [`1481bb4`](https://github.com/apache/spark/commit/1481bb4b632ea2d37a703e93ee6b09ff5c9fa8dd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533812
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala ---
    @@ -133,44 +133,44 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
             Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 258.10677784349247, 258.10677784349247, 34, 66619.10876874997, 0.811328754177887, 2801.7074999999995),
             Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 273.70217881648085, 273.70217881648085, 2, 74912.88268888886, 1.0, 4128.782222222221),
             Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 273.70217881648085, 273.70217881648085, 34, 74912.88268888886, 1.0, 4128.782222222221),
    -        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 230.9015158547037, 230.9015158547037, 2, 53315.510023999974, 0.6956393773976641, 2210.7864),
    --- End diff --
    
    It would be nice to see we can tolerate some small numerical differences in query tests. But this is out of scope here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51294095
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala ---
    @@ -133,44 +133,44 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
             Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 258.10677784349247, 258.10677784349247, 34, 66619.10876874997, 0.811328754177887, 2801.7074999999995),
             Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 273.70217881648085, 273.70217881648085, 2, 74912.88268888886, 1.0, 4128.782222222221),
             Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 273.70217881648085, 273.70217881648085, 34, 74912.88268888886, 1.0, 4128.782222222221),
    -        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 230.9015158547037, 230.9015158547037, 2, 53315.510023999974, 0.6956393773976641, 2210.7864),
    --- End diff --
    
    The new implementation of Corr/Covar have better accuracy, so updated the tests to match that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12963] Improve performance of stat func...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-176476053
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50304/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178687105
  
    **[Test build #50569 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50569/consoleFull)** for PR 10960 at commit [`5f98588`](https://github.com/apache/spark/commit/5f98588ed25eb8f4b94594922d8ca4c1847abca6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533797
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---
    @@ -59,6 +59,55 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
         benchmark.run()
       }
     
    +  def testStddev(values: Int): Unit = {
    --- End diff --
    
    The test actually touches both `stddev` and `kurtosis`, which should be reflected by the name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-177063535
  
    **[Test build #50432 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50432/consoleFull)** for PR 10960 at commit [`7e57a1a`](https://github.com/apache/spark/commit/7e57a1ab87f27659fd11f63ee86dd073f9687dc9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10960#issuecomment-178754162
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51533700
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    --- End diff --
    
    If my understanding is correct, there exists some unnecessary expression calculation when we don't need 3rd- or 4th-order statistics. But this is a one-time overhead because `updateExpression` is only called once before codegen. If this is correct, it would be helpful to leave a comment here to explain why we don't need `if` branches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10960#discussion_r51535393
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala ---
    @@ -57,173 +55,170 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w
     
       override def dataType: DataType = DoubleType
     
    -  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
    +  override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
    +
    +  protected val count = AttributeReference("count", DoubleType, nullable = false)()
    +  protected val avg = AttributeReference("avg", DoubleType, nullable = false)()
    +  protected val m2 = AttributeReference("m2", DoubleType, nullable = false)()
    +  protected val m3 = AttributeReference("m3", DoubleType, nullable = false)()
    +  protected val m4 = AttributeReference("m4", DoubleType, nullable = false)()
    +
    +  override val aggBufferAttributes = Seq(count, avg, m2, m3, m4).take(momentOrder + 1)
    +
    +  override val initialValues: Seq[Expression] = Seq(
    +    /* count = */ Literal(0.0),
    +    /* avg = */ Literal(0.0),
    +    /* m2 = */ Literal(0.0),
    +    /* m3 = */ Literal(0.0),
    +    /* m4 = */ Literal(0.0)
    +  ).take(momentOrder + 1)
    +
    +  override lazy val updateExpressions: Seq[Expression] = {
    +    val n = count + Literal(1.0)
    +    val delta = child - avg
    +    val deltaN = delta / n
    +    val newAvg = avg + deltaN
    +    val newM2 = m2 + delta * (delta - deltaN)
    +
    +    val delta2 = delta * delta
    +    val deltaN2 = deltaN * deltaN
    +    val newM3 = m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
    +
    +    val newM4 = m4 - Literal(4.0) * deltaN * newM3 - Literal(6.0) * deltaN2 * newM2 +
    +      delta * (delta * delta2 - deltaN * deltaN2)
    +
    +    if (child.nullable) {
    +      Seq(
    +        /* count = */ If(IsNull(child), count, n),
    +        /* avg = */ If(IsNull(child), avg, newAvg),
    +        /* m2 = */ If(IsNull(child), m2, newM2),
    +        /* m3 = */ If(IsNull(child), m3, newM3),
    +        /* m4 = */ If(IsNull(child), m4, newM4)
    +      ).take(momentOrder + 1)
    +    } else {
    +      Seq(
    +        /* count = */ n,
    +        /* avg = */ newAvg,
    +        /* m2 = */ newM2,
    +        /* m3 = */ newM3,
    +        /* m4 = */ newM4
    +      ).take(momentOrder + 1)
    +    }
    +  }
     
    -  override def checkInputDataTypes(): TypeCheckResult =
    -    TypeUtils.checkForNumericExpr(child.dataType, s"function $prettyName")
    +  override lazy val mergeExpressions: Seq[Expression] = {
     
    -  override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)
    +    val n1 = count.left
    +    val n2 = count.right
    +    val n = n1 + n2
    +    val delta = avg.right - avg.left
    +    val deltaN = If(EqualTo(n, Literal(0.0)), Literal(0.0), delta / n)
    +    val newAvg = avg.left + deltaN * n2
     
    -  /**
    -   * Size of aggregation buffer.
    -   */
    -  private[this] val bufferSize = 5
    +    // higher order moments computed according to:
    +    // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
    +    val newM2 = m2.left + m2.right + delta * deltaN * n1 * n2
    +    val newM3 = if (momentOrder >= 3) {
    +      m3.left + m3.right + deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
    +        Literal(3.0) * deltaN * (n1 * m2.right - n2 * m2.left)
    +    } else {
    +      Literal(0.0)
    +    }
    +    val newM4 = if (momentOrder >= 4) {
    +      m4.left + m4.right +
    +        deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
    +        Literal(6.0) * deltaN * deltaN * (n1 * n1 * m2.right + n2 * n2 * m2.left) +
    +        Literal(4.0) * deltaN * (n1 * m3.right - n2 * m3.left)
    +    } else {
    +      Literal(0.0)
    +    }
     
    -  override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(bufferSize) { i =>
    -    AttributeReference(s"M$i", DoubleType)()
    +    Seq(
    +      /* count = */ n,
    +      /* avg = */ newAvg,
    +      /* m2 = */ newM2,
    +      /* m3 = */ newM3,
    +      /* m4 = */ newM4
    +    ).take(momentOrder + 1)
       }
    +}
     
    -  // Note: although this simply copies aggBufferAttributes, this common code can not be placed
    -  // in the superclass because that will lead to initialization ordering issues.
    -  override val inputAggBufferAttributes: Seq[AttributeReference] =
    -    aggBufferAttributes.map(_.newInstance())
    -
    -  // buffer offsets
    -  private[this] val nOffset = mutableAggBufferOffset
    -  private[this] val meanOffset = mutableAggBufferOffset + 1
    -  private[this] val secondMomentOffset = mutableAggBufferOffset + 2
    -  private[this] val thirdMomentOffset = mutableAggBufferOffset + 3
    -  private[this] val fourthMomentOffset = mutableAggBufferOffset + 4
    -
    -  // frequently used values for online updates
    -  private[this] var delta = 0.0
    -  private[this] var deltaN = 0.0
    -  private[this] var delta2 = 0.0
    -  private[this] var deltaN2 = 0.0
    -  private[this] var n = 0.0
    -  private[this] var mean = 0.0
    -  private[this] var m2 = 0.0
    -  private[this] var m3 = 0.0
    -  private[this] var m4 = 0.0
    +// Compute the population standard deviation of a column
    +case class StddevPop(child: Expression) extends CentralMomentAgg(child) {
    --- End diff --
    
    We may still want to have a good name when you call `explain`, I'd like to keep them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12913] [SQL] Improve performance of sta...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10960


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org