You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2015/09/23 01:13:39 UTC

[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/8874

    [SPARK-10765][SQL] use new aggregate interface for hive UDAF

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark hive-agg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8874
    
----
commit e38674e54110caafca1a79c6192c4c9dd1363dec
Author: Wenchen Fan <cl...@163.com>
Date:   2015-09-22T23:09:53Z

    use new aggregate interface for hive UDAF

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142779974
  
      [Test build #42936 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42936/console) for   PR 8874 at commit [`b0d3572`](https://github.com/apache/spark/commit/b0d3572063e1fbd5b3eaa36c4651a79acfdf8c0e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142847312
  
      [Test build #42963 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42963/console) for   PR 8874 at commit [`e3ccd65`](https://github.com/apache/spark/commit/e3ccd6503fcdd47b877951c357d8842b304501ce).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142722077
  
      [Test build #42919 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42919/consoleFull) for   PR 8874 at commit [`5803ec2`](https://github.com/apache/spark/commit/5803ec28d319d11e6f0718cf0868ecd40ba80a5f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40239595
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -65,9 +66,9 @@ private[hive] class HiveFunctionRegistry(underlying: analysis.FunctionRegistry)
             HiveGenericUDF(new HiveFunctionWrapper(functionClassName), children)
           } else if (
             classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
    -        HiveGenericUDAF(new HiveFunctionWrapper(functionClassName), children)
    +        HiveUDAFFunction(new HiveFunctionWrapper(functionClassName), children)
           } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -        HiveUDAF(new HiveFunctionWrapper(functionClassName), children)
    +        HiveUDAFFunction(new HiveFunctionWrapper(functionClassName), children, true)
    --- End diff --
    
    Let's use named parameters to make the meaning of `true` at here clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142818022
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142847839
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40283331
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -221,7 +221,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
                 }
     
                 val aggregateOperator =
    -              if (functionsWithDistinct.isEmpty) {
    +              if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
    --- End diff --
    
    I just noticed that if we have some thing like `select count(distinct a), hiveUDAF(b)...`, we will go this branch. So, we will silently ignore the distinct keyword.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/8874


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142721282
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142750058
  
      [Test build #42919 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42919/console) for   PR 8874 at commit [`5803ec2`](https://github.com/apache/spark/commit/5803ec28d319d11e6f0718cf0868ecd40ba80a5f).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142818641
  
      [Test build #42963 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42963/consoleFull) for   PR 8874 at commit [`e3ccd65`](https://github.com/apache/spark/commit/e3ccd6503fcdd47b877951c357d8842b304501ce).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142847845
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42963/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142721315
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40232090
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +523,75 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private lazy val buffer = function.getNewAggregationBuffer
     
       override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    +
    +  // Hive UDAF has its own buffer
    +  override def bufferSchema: StructType = StructType(Nil)
     
    -  def update(input: InternalRow): Unit = {
    -    val inputs = inputProjection(input)
    +  override def update(_buffer: MutableRow, _input: InternalRow): Unit = {
    +    val inputs = inputProjection(_input)
         function.iterate(buffer, wrap(inputs, inspectors, cached, inputDataTypes))
       }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    throw new UnsupportedOperationException(
    +      "Hive UDAF doesn't support partial aggregate")
    +  }
    +
    +  override def cloneBufferAttributes: Seq[Attribute] = Nil
    +
    +  override def initialize(buffer: MutableRow): Unit = {}
    --- End diff --
    
    Seems we need to call `buffer.reset` to reset the buffer because this function is be used to process all groups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40253494
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +523,75 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private lazy val buffer = function.getNewAggregationBuffer
     
       override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    +
    +  // Hive UDAF has its own buffer
    +  override def bufferSchema: StructType = StructType(Nil)
     
    -  def update(input: InternalRow): Unit = {
    -    val inputs = inputProjection(input)
    +  override def update(_buffer: MutableRow, _input: InternalRow): Unit = {
    +    val inputs = inputProjection(_input)
         function.iterate(buffer, wrap(inputs, inspectors, cached, inputDataTypes))
       }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    throw new UnsupportedOperationException(
    +      "Hive UDAF doesn't support partial aggregate")
    +  }
    +
    +  override def cloneBufferAttributes: Seq[Attribute] = Nil
    +
    +  override def initialize(buffer: MutableRow): Unit = {}
    +
    +  override def bufferAttributes: Seq[AttributeReference] = Nil
    +
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    --- End diff --
    
    Using `Nil` here means we don't need to check types for it, and it's the same with using `Seq.fill(children.length)(AnyDataType)`. See https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L705-L710


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40238553
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---
    @@ -37,6 +37,57 @@ object Utils {
           UnsafeProjection.canSupport(groupingExpressions)
       }
     
    +  def planAggregateWithoutPartial(
    +      groupingExpressions: Seq[Expression],
    +      aggregateExpressions: Seq[AggregateExpression2],
    +      aggregateFunctionMap: Map[(AggregateFunction2, Boolean), (AggregateFunction2, Attribute)],
    +      resultExpressions: Seq[NamedExpression],
    +      child: SparkPlan): Seq[SparkPlan] = {
    +
    +    val namedGroupingExpressions = groupingExpressions.map {
    +      case ne: NamedExpression => ne -> ne
    +      // If the expression is not a NamedExpressions, we add an alias.
    +      // So, when we generate the result of the operator, the Aggregate Operator
    +      // can directly get the Seq of attributes representing the grouping expressions.
    +      case other =>
    +        val withAlias = Alias(other, other.toString)()
    +        other -> withAlias
    +    }
    +    val groupExpressionMap = namedGroupingExpressions.toMap
    +    val namedGroupingAttributes = namedGroupingExpressions.map(_._2.toAttribute)
    +
    +    val finalAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
    +    val finalAggregateAttributes =
    --- End diff --
    
    `completeAggregateExpressions` and `completeAggregateAttributes`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142463761
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42869/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142759003
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142750162
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142463760
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142449631
  
      [Test build #42869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42869/consoleFull) for   PR 8874 at commit [`e38674e`](https://github.com/apache/spark/commit/e38674e54110caafca1a79c6192c4c9dd1363dec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40240921
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +523,75 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private lazy val buffer = function.getNewAggregationBuffer
     
       override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    +
    +  // Hive UDAF has its own buffer
    +  override def bufferSchema: StructType = StructType(Nil)
    --- End diff --
    
    How about adding more comments? Something like we do not need to occupy a slot in the buffer because Hive UDAF has its own buffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142986424
  
    LGTM. Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40261037
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +524,78 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
     
       override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
     
    -  def update(input: InternalRow): Unit = {
    +  // Hive UDAF has its own buffer, so we don't need to occupy a slot in the aggregation
    +  // buffer for it.
    +  override def bufferSchema: StructType = StructType(Nil)
    +
    +  override def update(_buffer: MutableRow, input: InternalRow): Unit = {
         val inputs = inputProjection(input)
         function.iterate(buffer, wrap(inputs, inspectors, cached, inputDataTypes))
       }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    throw new UnsupportedOperationException(
    +      "Hive UDAF doesn't support partial aggregate")
    +  }
    +
    +  override def cloneBufferAttributes: Seq[Attribute] = Nil
    +
    +  override def initialize(_buffer: MutableRow): Unit = {
    +    buffer = function.getNewAggregationBuffer
    +  }
    +
    +  override def bufferAttributes: Seq[AttributeReference] = Nil
    +
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    --- End diff --
    
    How about adding comments to explain why it is `Nil`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40232721
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala ---
    @@ -169,6 +168,9 @@ abstract class AggregateFunction2
     
       override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
         throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
    +
    +  // Some aggregate function like Hive UDAF doesn't support partial aggregation, should override it.
    +  def supportPartial: Boolean = true
    --- End diff --
    
    How about we name it as `supportsPartial`? Also, for the doc, let's make it as Scala doc, e.g.
    ```
    /** Indicates if this function supports partial aggregation. */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142451072
  
    cc @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40239298
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---
    @@ -37,6 +37,57 @@ object Utils {
           UnsafeProjection.canSupport(groupingExpressions)
       }
     
    +  def planAggregateWithoutPartial(
    +      groupingExpressions: Seq[Expression],
    +      aggregateExpressions: Seq[AggregateExpression2],
    +      aggregateFunctionMap: Map[(AggregateFunction2, Boolean), (AggregateFunction2, Attribute)],
    +      resultExpressions: Seq[NamedExpression],
    +      child: SparkPlan): Seq[SparkPlan] = {
    +
    +    val namedGroupingExpressions = groupingExpressions.map {
    +      case ne: NamedExpression => ne -> ne
    +      // If the expression is not a NamedExpressions, we add an alias.
    +      // So, when we generate the result of the operator, the Aggregate Operator
    +      // can directly get the Seq of attributes representing the grouping expressions.
    +      case other =>
    +        val withAlias = Alias(other, other.toString)()
    +        other -> withAlias
    +    }
    +    val groupExpressionMap = namedGroupingExpressions.toMap
    +    val namedGroupingAttributes = namedGroupingExpressions.map(_._2.toAttribute)
    +
    +    val finalAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
    +    val finalAggregateAttributes =
    +      finalAggregateExpressions.map {
    +        expr => aggregateFunctionMap(expr.aggregateFunction, expr.isDistinct)._2
    +      }
    +
    +    val rewrittenResultExpressions = resultExpressions.map { expr =>
    +      expr.transformDown {
    +        case agg: AggregateExpression2 =>
    +          aggregateFunctionMap(agg.aggregateFunction, agg.isDistinct)._2
    --- End diff --
    
    nvm. I looked the wrong place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142780060
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142758989
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142759900
  
      [Test build #42936 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42936/consoleFull) for   PR 8874 at commit [`b0d3572`](https://github.com/apache/spark/commit/b0d3572063e1fbd5b3eaa36c4651a79acfdf8c0e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142463726
  
      [Test build #42869 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42869/console) for   PR 8874 at commit [`e38674e`](https://github.com/apache/spark/commit/e38674e54110caafca1a79c6192c4c9dd1363dec).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142818009
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40261146
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +524,78 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
    --- End diff --
    
    It will be better to explain that if a query has a HiveUDAF, then we will not do partial aggregation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142780061
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42936/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142449513
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40239061
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---
    @@ -37,6 +37,57 @@ object Utils {
           UnsafeProjection.canSupport(groupingExpressions)
       }
     
    +  def planAggregateWithoutPartial(
    +      groupingExpressions: Seq[Expression],
    +      aggregateExpressions: Seq[AggregateExpression2],
    +      aggregateFunctionMap: Map[(AggregateFunction2, Boolean), (AggregateFunction2, Attribute)],
    +      resultExpressions: Seq[NamedExpression],
    +      child: SparkPlan): Seq[SparkPlan] = {
    +
    +    val namedGroupingExpressions = groupingExpressions.map {
    +      case ne: NamedExpression => ne -> ne
    +      // If the expression is not a NamedExpressions, we add an alias.
    +      // So, when we generate the result of the operator, the Aggregate Operator
    +      // can directly get the Seq of attributes representing the grouping expressions.
    +      case other =>
    +        val withAlias = Alias(other, other.toString)()
    +        other -> withAlias
    +    }
    +    val groupExpressionMap = namedGroupingExpressions.toMap
    +    val namedGroupingAttributes = namedGroupingExpressions.map(_._2.toAttribute)
    +
    +    val finalAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
    +    val finalAggregateAttributes =
    +      finalAggregateExpressions.map {
    +        expr => aggregateFunctionMap(expr.aggregateFunction, expr.isDistinct)._2
    +      }
    +
    +    val rewrittenResultExpressions = resultExpressions.map { expr =>
    +      expr.transformDown {
    +        case agg: AggregateExpression2 =>
    +          aggregateFunctionMap(agg.aggregateFunction, agg.isDistinct)._2
    --- End diff --
    
    Do we need to use the following instead?
    ```
    // aggregateFunctionMap contains unique aggregate functions.
    val aggregateFunction =
      aggregateFunctionMap(agg.aggregateFunction, agg.isDistinct)._1
      aggregateFunction.asInstanceOf[AlgebraicAggregate].evaluateExpression
    ```
    Basically `rewrittenResultExpressions` will do the work of evaluating aggregate functions and generate final results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40243608
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +523,75 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private lazy val buffer = function.getNewAggregationBuffer
     
       override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    +
    +  // Hive UDAF has its own buffer
    +  override def bufferSchema: StructType = StructType(Nil)
     
    -  def update(input: InternalRow): Unit = {
    -    val inputs = inputProjection(input)
    +  override def update(_buffer: MutableRow, _input: InternalRow): Unit = {
    +    val inputs = inputProjection(_input)
         function.iterate(buffer, wrap(inputs, inspectors, cached, inputDataTypes))
       }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    throw new UnsupportedOperationException(
    +      "Hive UDAF doesn't support partial aggregate")
    +  }
    +
    +  override def cloneBufferAttributes: Seq[Attribute] = Nil
    +
    +  override def initialize(buffer: MutableRow): Unit = {}
    +
    +  override def bufferAttributes: Seq[AttributeReference] = Nil
    +
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    --- End diff --
    
    We are relying on Hive to check the data types of input parameters, right? If so, is it better to use `Seq.fill(children.length)(AnyDataType)` instead of `Nil` and add comments at here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40286642
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -221,7 +221,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
                 }
     
                 val aggregateOperator =
    -              if (functionsWithDistinct.isEmpty) {
    +              if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
    --- End diff --
    
    Ah, good catch! We should check `distinct` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40257910
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---
    @@ -586,47 +523,75 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private lazy val buffer = function.getNewAggregationBuffer
     
       override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
    +
    +  // Hive UDAF has its own buffer
    +  override def bufferSchema: StructType = StructType(Nil)
     
    -  def update(input: InternalRow): Unit = {
    -    val inputs = inputProjection(input)
    +  override def update(_buffer: MutableRow, _input: InternalRow): Unit = {
    +    val inputs = inputProjection(_input)
         function.iterate(buffer, wrap(inputs, inspectors, cached, inputDataTypes))
       }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    throw new UnsupportedOperationException(
    +      "Hive UDAF doesn't support partial aggregate")
    +  }
    +
    +  override def cloneBufferAttributes: Seq[Attribute] = Nil
    +
    +  override def initialize(buffer: MutableRow): Unit = {}
    +
    +  override def bufferAttributes: Seq[AttributeReference] = Nil
    +
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    --- End diff --
    
    yeah. My thought is `Nil` is relying on that implementation detail. While, using `AnyDataType` is more explicit. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142750164
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42919/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10765][SQL] use new aggregate interface...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8874#issuecomment-142449493
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org