You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2015/12/09 20:13:28 UTC

[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/10228

    [SPARK-12213] [SQL] use multiple partitions for single distinct query

    Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other 
    works better for high cardinality column (default one).
    
    This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6).
    
    cc @yhuai @nongli @marmbrus 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark single_distinct

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10228
    
----
commit e3f2e79a5e6da3fcfe7d0441dd00cb31f3a30992
Author: Davies Liu <da...@databricks.com>
Date:   2015-12-09T19:05:25Z

    use multiple partitions for single distinct query

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164088946
  
    **[Test build #47601 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47601/consoleFull)** for PR 10228 at commit [`71e0b1c`](https://github.com/apache/spark/commit/71e0b1c29ec50c6ce6590dc43997e66f1b4011ac).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164313814
  
    @hvanhovell How about we merge this first and we take a look at how to use a single rule to handle aggregation queries with distinct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47440934
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -49,41 +47,20 @@ abstract class AggregationIterator(
       // Initializing functions.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // An Seq of all AggregateExpressions.
    -  // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final
    -  // are at the beginning of the allAggregateExpressions.
    -  protected val allAggregateExpressions =
    -    nonCompleteAggregateExpressions ++ completeAggregateExpressions
    -
       require(
    -    allAggregateExpressions.map(_.mode).distinct.length <= 2,
    -    s"$allAggregateExpressions are not supported becuase they have more than 2 distinct modes.")
    -
    -  /**
    -   * The distinct modes of AggregateExpressions. Right now, we can handle the following mode:
    --- End diff --
    
    Added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164360720
  
    Cool. I am merging this one to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47281934
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -326,73 +192,61 @@ abstract class AggregationIterator(
         } else {
           safeOutputRow
         }
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    -        // support generic getter), we create a mutable projection to output the
    -        // JoinedRow(currentGroupingKey, currentBuffer)
    -        val bufferSchema = nonCompleteAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val resultProjection =
    -          newMutableProjection(
    -            groupingKeyAttributes ++ bufferSchema,
    -            groupingKeyAttributes ++ bufferSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    -          // rowToBeEvaluated(currentGroupingKey, currentBuffer)
    -        }
    -
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    if (!modes.contains(Final) && !modes.contains(Complete)) {
    +      // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    +      // support generic getter), we create a mutable projection to output the
    +      // JoinedRow(currentGroupingKey, currentBuffer)
    +      val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +      val resultProjection =
    +        newMutableProjection(
    +          groupingKeyAttributes ++ bufferSchema,
    +          groupingKeyAttributes ++ bufferSchema)()
    +      resultProjection.target(mutableOutput)
    +      (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    +        resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    +      }
    +    } else if (aggregateExpressions.nonEmpty) {
           // Final-only, Complete-only and Final-Complete: every output row contains values representing
           // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val bufferSchemata =
    -          allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        // TODO: Use unsafe row.
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          newMutableProjection(
    -            resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +      val bufferSchemata = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +      val evalExpressions = aggregateFunctions.map {
    +        case ae: DeclarativeAggregate => ae.evaluateExpression
    +        case agg: AggregateFunction => NoOp
    +      }
    +      val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    +      val aggregateResultSchema = aggregateAttributes
    +      // TODO: Use unsafe row.
    +      val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    +      expressionAggEvalProjection.target(aggregateResult)
    +
    +      val resultProjection =
    +        newMutableProjection(
    +          resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    +      resultProjection.target(mutableOutput)
    +
    +      (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    +        // Generate results for all expression-based aggregate functions.
    +        expressionAggEvalProjection(currentBuffer)
    +        // Generate results for all imperative aggregate functions.
    +        var i = 0
    +        while (i < allImperativeAggregateFunctions.length) {
    +          aggregateResult.update(
    +            allImperativeAggregateFunctionPositions(i),
    +            allImperativeAggregateFunctions(i).eval(currentBuffer))
    +          i += 1
             }
    -
    +        resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +      }
    +    } else {
           // Grouping-only: we only output values of grouping expressions.
    --- End diff --
    
    the output could be fewer than grouping key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47276459
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -49,41 +47,20 @@ abstract class AggregationIterator(
       // Initializing functions.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // An Seq of all AggregateExpressions.
    -  // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final
    -  // are at the beginning of the allAggregateExpressions.
    -  protected val allAggregateExpressions =
    -    nonCompleteAggregateExpressions ++ completeAggregateExpressions
    -
       require(
    -    allAggregateExpressions.map(_.mode).distinct.length <= 2,
    -    s"$allAggregateExpressions are not supported becuase they have more than 2 distinct modes.")
    -
    -  /**
    -   * The distinct modes of AggregateExpressions. Right now, we can handle the following mode:
    --- End diff --
    
    Can you add a similar comment for the new version? Which combinations are valid now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47439997
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -154,248 +131,112 @@ abstract class AggregationIterator(
       }
     
       // All imperative AggregateFunctions.
    -  private[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    +  protected[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
         allImperativeAggregateFunctionPositions
    -      .map(allAggregateFunctions)
    +      .map(aggregateFunctions)
           .map(_.asInstanceOf[ImperativeAggregate])
     
    -  ///////////////////////////////////////////////////////////////////////////
    -  // Methods and fields used by sub-classes.
    -  ///////////////////////////////////////////////////////////////////////////
    -
       // Initializing functions used to process a row.
    -  protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        // The first initialInputBufferOffset values of the input aggregation buffer is
    -        // for grouping expressions and distinct columns.
    -        val groupingAttributesAndDistinctColumns = valueAttributes.take(initialInputBufferOffset)
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -
    -        val mergeInputSchema =
    -          aggregationBufferSchema ++
    -            groupingAttributesAndDistinctColumns ++
    -            nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalExpressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, mergeInputSchema)()
    -
    -        val updateExpressions =
    -          finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffers.
    -          finalExpressionAggMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    +  protected def generateProcessRow(
    +    expressions: Seq[AggregateExpression],
    +    functions: Seq[AggregateFunction],
    +    inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
    --- End diff --
    
    format


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164356726
  
    @yhuai LGTM. Yea, lets merge this one, I'll create a ticket for the distinct rules


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163747267
  
    **[Test build #47536 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47536/consoleFull)** for PR 10228 at commit [`a9eae30`](https://github.com/apache/spark/commit/a9eae303166d6c3ba1f80a22265482b9f4d0a525).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163845995
  
    **[Test build #47570 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47570/consoleFull)** for PR 10228 at commit [`740e725`](https://github.com/apache/spark/commit/740e7258f384931ae0856ac96779790617aed914).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163747500
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47536/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47439998
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -154,248 +131,112 @@ abstract class AggregationIterator(
       }
     
       // All imperative AggregateFunctions.
    -  private[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    +  protected[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
         allImperativeAggregateFunctionPositions
    -      .map(allAggregateFunctions)
    +      .map(aggregateFunctions)
           .map(_.asInstanceOf[ImperativeAggregate])
     
    -  ///////////////////////////////////////////////////////////////////////////
    -  // Methods and fields used by sub-classes.
    -  ///////////////////////////////////////////////////////////////////////////
    -
       // Initializing functions used to process a row.
    -  protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        // The first initialInputBufferOffset values of the input aggregation buffer is
    -        // for grouping expressions and distinct columns.
    -        val groupingAttributesAndDistinctColumns = valueAttributes.take(initialInputBufferOffset)
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -
    -        val mergeInputSchema =
    -          aggregationBufferSchema ++
    -            groupingAttributesAndDistinctColumns ++
    -            nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalExpressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, mergeInputSchema)()
    -
    -        val updateExpressions =
    -          finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffers.
    -          finalExpressionAggMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    +  protected def generateProcessRow(
    +    expressions: Seq[AggregateExpression],
    +    functions: Seq[AggregateFunction],
    +    inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
    +    val joinedRow = new JoinedRow
    +    if (expressions.nonEmpty) {
    +      val mergeExpressions = functions.zipWithIndex.flatMap {
    +        case (ae: DeclarativeAggregate, i) =>
    +          expressions(i).mode match {
    +            case Partial | Complete => ae.updateExpressions
    +            case PartialMerge | Final => ae.mergeExpressions
               }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions =
    -          completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    +        case (agg: AggregateFunction, _) => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    +      }
    +      val updateFunctions = functions.zipWithIndex.collect {
    +        case (ae: ImperativeAggregate, i) =>
    +          expressions(i).mode match {
    +            case Partial | Complete =>
    +              (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
    +            case PartialMerge | Final =>
    +              (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
               }
    +      }
    +      // This projection is used to merge buffer values for all expression-based aggregates.
    +      val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes)
    +      val updateProjection =
    +        newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)()
    +
    +      (currentBuffer: MutableRow, row: InternalRow) => {
    +        // Process all expression-based aggregate functions.
    +        updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    +        // Process all imperative aggregate functions.
    +        var i = 0
    +        while (i < updateFunctions.length) {
    +          updateFunctions(i)(currentBuffer, row)
    +          i += 1
             }
    -
    +      }
    +    } else {
           // Grouping only.
    -      case (None, None) => (currentBuffer: MutableRow, row: InternalRow) => {}
    -
    -      case other =>
    -        sys.error(
    -          s"Could not evaluate ${nonCompleteAggregateExpressions} because we do not " +
    -            s"support evaluate modes $other in this iterator.")
    +      (currentBuffer: MutableRow, row: InternalRow) => {}
         }
       }
     
    -  // Initializing the function used to generate the output row.
    -  protected val generateOutput: (InternalRow, MutableRow) => InternalRow = {
    -    val rowToBeEvaluated = new JoinedRow
    -    val safeOutputRow = new SpecificMutableRow(resultExpressions.map(_.dataType))
    -    val mutableOutput = if (outputsUnsafeRows) {
    -      UnsafeProjection.create(resultExpressions.map(_.dataType).toArray).apply(safeOutputRow)
    -    } else {
    -      safeOutputRow
    -    }
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    -        // support generic getter), we create a mutable projection to output the
    -        // JoinedRow(currentGroupingKey, currentBuffer)
    -        val bufferSchema = nonCompleteAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val resultProjection =
    -          newMutableProjection(
    -            groupingKeyAttributes ++ bufferSchema,
    -            groupingKeyAttributes ++ bufferSchema)()
    -        resultProjection.target(mutableOutput)
    +  protected val processRow: (MutableRow, InternalRow) => Unit =
    +    generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
     
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    -          // rowToBeEvaluated(currentGroupingKey, currentBuffer)
    -        }
    +  protected val groupingProjection: UnsafeProjection =
    +    UnsafeProjection.create(groupingExpressions, inputAttributes)
    +  protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
     
    -      // Final-only, Complete-only and Final-Complete: every output row contains values representing
    -      // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val bufferSchemata =
    -          allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        // TODO: Use unsafe row.
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          newMutableProjection(
    -            resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +  // Initializing the function used to generate the output row.
    +  protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
    +    val joinedRow = new JoinedRow
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +    if (modes.contains(Final) || modes.contains(Complete)) {
    --- End diff --
    
    If it is invalid to have a `Partial` or a `PartialMerge`, how about we add a check and comments at here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164089027
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47444204
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -165,237 +137,100 @@ abstract class AggregationIterator(
     
       // Initializing functions used to process a row.
       protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    +    val joinedRow = new JoinedRow
    +    if (aggregateExpressions.nonEmpty) {
    +      val mergeExpressions = aggregateFunctions.zipWithIndex.flatMap {
    +        case (ae: DeclarativeAggregate, i) =>
    +          aggregateExpressions(i).mode match {
    +            case Partial | Complete => ae.updateExpressions
    +            case PartialMerge | Final => ae.mergeExpressions
               }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        // The first initialInputBufferOffset values of the input aggregation buffer is
    -        // for grouping expressions and distinct columns.
    -        val groupingAttributesAndDistinctColumns = valueAttributes.take(initialInputBufferOffset)
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -
    -        val mergeInputSchema =
    -          aggregationBufferSchema ++
    -            groupingAttributesAndDistinctColumns ++
    -            nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalExpressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, mergeInputSchema)()
    -
    -        val updateExpressions =
    -          finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffers.
    -          finalExpressionAggMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions =
    -          completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    +        case (agg: AggregateFunction, _) => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    +      }
    +      val updateFunctions = aggregateFunctions.zipWithIndex.collect {
    --- End diff --
    
    ```zip(aggregateExpressions)``` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164264208
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163806563
  
    **[Test build #47559 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47559/consoleFull)** for PR 10228 at commit [`8262ad8`](https://github.com/apache/spark/commit/8262ad89eb49da0c36b76c8089d92d9ce5b45c52).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47439996
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -33,64 +33,46 @@ import scala.collection.mutable.ArrayBuffer
      *    is used to generate result.
      */
     abstract class AggregationIterator(
    -    groupingKeyAttributes: Seq[Attribute],
    -    valueAttributes: Seq[Attribute],
    -    nonCompleteAggregateExpressions: Seq[AggregateExpression],
    -    nonCompleteAggregateAttributes: Seq[Attribute],
    -    completeAggregateExpressions: Seq[AggregateExpression],
    -    completeAggregateAttributes: Seq[Attribute],
    +    groupingExpressions: Seq[NamedExpression],
    +    inputAttributes: Seq[Attribute],
    +    aggregateExpressions: Seq[AggregateExpression],
    +    aggregateAttributes: Seq[Attribute],
         initialInputBufferOffset: Int,
         resultExpressions: Seq[NamedExpression],
    -    newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
    -    outputsUnsafeRows: Boolean)
    -  extends Iterator[InternalRow] with Logging {
    +    newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection))
    +  extends Iterator[UnsafeRow] with Logging {
     
       ///////////////////////////////////////////////////////////////////////////
       // Initializing functions.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // An Seq of all AggregateExpressions.
    -  // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final
    -  // are at the beginning of the allAggregateExpressions.
    -  protected val allAggregateExpressions =
    -    nonCompleteAggregateExpressions ++ completeAggregateExpressions
    -
    -  require(
    -    allAggregateExpressions.map(_.mode).distinct.length <= 2,
    -    s"$allAggregateExpressions are not supported becuase they have more than 2 distinct modes.")
    -
    -  /**
    -   * The distinct modes of AggregateExpressions. Right now, we can handle the following mode:
    -   *  - Partial-only: all AggregateExpressions have the mode of Partial;
    -   *  - PartialMerge-only: all AggregateExpressions have the mode of PartialMerge);
    -   *  - Final-only: all AggregateExpressions have the mode of Final;
    -   *  - Final-Complete: some AggregateExpressions have the mode of Final and
    -   *    others have the mode of Complete;
    -   *  - Complete-only: nonCompleteAggregateExpressions is empty and we have AggregateExpressions
    -   *    with mode Complete in completeAggregateExpressions; and
    -   *  - Grouping-only: there is no AggregateExpression.
    -   */
    -  protected val aggregationMode: (Option[AggregateMode], Option[AggregateMode]) =
    -    nonCompleteAggregateExpressions.map(_.mode).distinct.headOption ->
    -      completeAggregateExpressions.map(_.mode).distinct.headOption
    +  {
    +    val modes = aggregateExpressions.map(_.mode).distinct.toSet
    +    require(modes.size <= 2,
    +      s"$aggregateExpressions are not supported because they have more than 2 distinct modes.")
    +    require(modes.subsetOf(Set(Partial, PartialMerge)) || modes.subsetOf(Set(Final, Complete)),
    +      s"$aggregateExpressions can't have Partial/PartialMerge and Final/Complete in the same time.")
    +  }
     
       // Initialize all AggregateFunctions by binding references if necessary,
       // and set inputBufferOffset and mutableBufferOffset.
    -  protected val allAggregateFunctions: Array[AggregateFunction] = {
    +  protected def initializeAggregateFunctions(
    +    expressions: Seq[AggregateExpression],
    +    startingInputBufferOffset: Int): Array[AggregateFunction] = {
    --- End diff --
    
    format


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163760324
  
    **[Test build #47544 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47544/consoleFull)** for PR 10228 at commit [`3f1ea7f`](https://github.com/apache/spark/commit/3f1ea7f86ec834e5a32af58de74f919b354f0d61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163783931
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47544/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163759063
  
    @hvanhovell The difficulty of doing this in DistinctAggregateRewriter is that DistinctAggregateRewriter will generate two logical plan, but some aggregation functions have different updateExpression and mergeExpression, so will could not work as update-merge-update-final, they should work as update-merge-merge-final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164219079
  
    @davies I only left a few minor comments. Overall, it is very cool!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47439999
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -154,248 +131,112 @@ abstract class AggregationIterator(
       }
     
       // All imperative AggregateFunctions.
    -  private[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    +  protected[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
         allImperativeAggregateFunctionPositions
    -      .map(allAggregateFunctions)
    +      .map(aggregateFunctions)
           .map(_.asInstanceOf[ImperativeAggregate])
     
    -  ///////////////////////////////////////////////////////////////////////////
    -  // Methods and fields used by sub-classes.
    -  ///////////////////////////////////////////////////////////////////////////
    -
       // Initializing functions used to process a row.
    -  protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        // The first initialInputBufferOffset values of the input aggregation buffer is
    -        // for grouping expressions and distinct columns.
    -        val groupingAttributesAndDistinctColumns = valueAttributes.take(initialInputBufferOffset)
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -
    -        val mergeInputSchema =
    -          aggregationBufferSchema ++
    -            groupingAttributesAndDistinctColumns ++
    -            nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalExpressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, mergeInputSchema)()
    -
    -        val updateExpressions =
    -          finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffers.
    -          finalExpressionAggMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    +  protected def generateProcessRow(
    +    expressions: Seq[AggregateExpression],
    +    functions: Seq[AggregateFunction],
    +    inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
    +    val joinedRow = new JoinedRow
    +    if (expressions.nonEmpty) {
    +      val mergeExpressions = functions.zipWithIndex.flatMap {
    +        case (ae: DeclarativeAggregate, i) =>
    +          expressions(i).mode match {
    +            case Partial | Complete => ae.updateExpressions
    +            case PartialMerge | Final => ae.mergeExpressions
               }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions =
    -          completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    +        case (agg: AggregateFunction, _) => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    +      }
    +      val updateFunctions = functions.zipWithIndex.collect {
    +        case (ae: ImperativeAggregate, i) =>
    +          expressions(i).mode match {
    +            case Partial | Complete =>
    +              (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
    +            case PartialMerge | Final =>
    +              (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
               }
    +      }
    +      // This projection is used to merge buffer values for all expression-based aggregates.
    +      val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes)
    +      val updateProjection =
    +        newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)()
    +
    +      (currentBuffer: MutableRow, row: InternalRow) => {
    +        // Process all expression-based aggregate functions.
    +        updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    +        // Process all imperative aggregate functions.
    +        var i = 0
    +        while (i < updateFunctions.length) {
    +          updateFunctions(i)(currentBuffer, row)
    +          i += 1
             }
    -
    +      }
    +    } else {
           // Grouping only.
    -      case (None, None) => (currentBuffer: MutableRow, row: InternalRow) => {}
    -
    -      case other =>
    -        sys.error(
    -          s"Could not evaluate ${nonCompleteAggregateExpressions} because we do not " +
    -            s"support evaluate modes $other in this iterator.")
    +      (currentBuffer: MutableRow, row: InternalRow) => {}
         }
       }
     
    -  // Initializing the function used to generate the output row.
    -  protected val generateOutput: (InternalRow, MutableRow) => InternalRow = {
    -    val rowToBeEvaluated = new JoinedRow
    -    val safeOutputRow = new SpecificMutableRow(resultExpressions.map(_.dataType))
    -    val mutableOutput = if (outputsUnsafeRows) {
    -      UnsafeProjection.create(resultExpressions.map(_.dataType).toArray).apply(safeOutputRow)
    -    } else {
    -      safeOutputRow
    -    }
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    -        // support generic getter), we create a mutable projection to output the
    -        // JoinedRow(currentGroupingKey, currentBuffer)
    -        val bufferSchema = nonCompleteAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val resultProjection =
    -          newMutableProjection(
    -            groupingKeyAttributes ++ bufferSchema,
    -            groupingKeyAttributes ++ bufferSchema)()
    -        resultProjection.target(mutableOutput)
    +  protected val processRow: (MutableRow, InternalRow) => Unit =
    +    generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
     
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    -          // rowToBeEvaluated(currentGroupingKey, currentBuffer)
    -        }
    +  protected val groupingProjection: UnsafeProjection =
    +    UnsafeProjection.create(groupingExpressions, inputAttributes)
    +  protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
     
    -      // Final-only, Complete-only and Final-Complete: every output row contains values representing
    -      // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val bufferSchemata =
    -          allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        // TODO: Use unsafe row.
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          newMutableProjection(
    -            resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +  // Initializing the function used to generate the output row.
    +  protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
    +    val joinedRow = new JoinedRow
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +    if (modes.contains(Final) || modes.contains(Complete)) {
    +      val evalExpressions = aggregateFunctions.map {
    +        case ae: DeclarativeAggregate => ae.evaluateExpression
    +        case agg: AggregateFunction => NoOp
    +      }
    +      val aggregateResult = new SpecificMutableRow(aggregateAttributes.map(_.dataType))
    +      val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)()
    +      expressionAggEvalProjection.target(aggregateResult)
    +
    +      val resultProjection =
    +        UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateAttributes)
    +
    +      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
    +        // Generate results for all expression-based aggregate functions.
    +        expressionAggEvalProjection(currentBuffer)
    +        // Generate results for all imperative aggregate functions.
    +        var i = 0
    +        while (i < allImperativeAggregateFunctions.length) {
    +          aggregateResult.update(
    +            allImperativeAggregateFunctionPositions(i),
    +            allImperativeAggregateFunctions(i).eval(currentBuffer))
    +          i += 1
             }
    -
    +        resultProjection(joinedRow(currentGroupingKey, aggregateResult))
    +      }
    +    } else if (modes.contains(Partial) || modes.contains(PartialMerge)) {
    --- End diff --
    
    same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47440907
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -154,248 +131,112 @@ abstract class AggregationIterator(
       }
     
       // All imperative AggregateFunctions.
    -  private[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    +  protected[this] val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
         allImperativeAggregateFunctionPositions
    -      .map(allAggregateFunctions)
    +      .map(aggregateFunctions)
           .map(_.asInstanceOf[ImperativeAggregate])
     
    -  ///////////////////////////////////////////////////////////////////////////
    -  // Methods and fields used by sub-classes.
    -  ///////////////////////////////////////////////////////////////////////////
    -
       // Initializing functions used to process a row.
    -  protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        // The first initialInputBufferOffset values of the input aggregation buffer is
    -        // for grouping expressions and distinct columns.
    -        val groupingAttributesAndDistinctColumns = valueAttributes.take(initialInputBufferOffset)
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -
    -        val mergeInputSchema =
    -          aggregationBufferSchema ++
    -            groupingAttributesAndDistinctColumns ++
    -            nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalExpressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, mergeInputSchema)()
    -
    -        val updateExpressions =
    -          finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffers.
    -          finalExpressionAggMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    +  protected def generateProcessRow(
    +    expressions: Seq[AggregateExpression],
    +    functions: Seq[AggregateFunction],
    +    inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
    +    val joinedRow = new JoinedRow
    +    if (expressions.nonEmpty) {
    +      val mergeExpressions = functions.zipWithIndex.flatMap {
    +        case (ae: DeclarativeAggregate, i) =>
    +          expressions(i).mode match {
    +            case Partial | Complete => ae.updateExpressions
    +            case PartialMerge | Final => ae.mergeExpressions
               }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions =
    -          completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    +        case (agg: AggregateFunction, _) => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    +      }
    +      val updateFunctions = functions.zipWithIndex.collect {
    +        case (ae: ImperativeAggregate, i) =>
    +          expressions(i).mode match {
    +            case Partial | Complete =>
    +              (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
    +            case PartialMerge | Final =>
    +              (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
               }
    +      }
    +      // This projection is used to merge buffer values for all expression-based aggregates.
    +      val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes)
    +      val updateProjection =
    +        newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)()
    +
    +      (currentBuffer: MutableRow, row: InternalRow) => {
    +        // Process all expression-based aggregate functions.
    +        updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    +        // Process all imperative aggregate functions.
    +        var i = 0
    +        while (i < updateFunctions.length) {
    +          updateFunctions(i)(currentBuffer, row)
    +          i += 1
             }
    -
    +      }
    +    } else {
           // Grouping only.
    -      case (None, None) => (currentBuffer: MutableRow, row: InternalRow) => {}
    -
    -      case other =>
    -        sys.error(
    -          s"Could not evaluate ${nonCompleteAggregateExpressions} because we do not " +
    -            s"support evaluate modes $other in this iterator.")
    +      (currentBuffer: MutableRow, row: InternalRow) => {}
         }
       }
     
    -  // Initializing the function used to generate the output row.
    -  protected val generateOutput: (InternalRow, MutableRow) => InternalRow = {
    -    val rowToBeEvaluated = new JoinedRow
    -    val safeOutputRow = new SpecificMutableRow(resultExpressions.map(_.dataType))
    -    val mutableOutput = if (outputsUnsafeRows) {
    -      UnsafeProjection.create(resultExpressions.map(_.dataType).toArray).apply(safeOutputRow)
    -    } else {
    -      safeOutputRow
    -    }
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    -        // support generic getter), we create a mutable projection to output the
    -        // JoinedRow(currentGroupingKey, currentBuffer)
    -        val bufferSchema = nonCompleteAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val resultProjection =
    -          newMutableProjection(
    -            groupingKeyAttributes ++ bufferSchema,
    -            groupingKeyAttributes ++ bufferSchema)()
    -        resultProjection.target(mutableOutput)
    +  protected val processRow: (MutableRow, InternalRow) => Unit =
    +    generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
     
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    -          // rowToBeEvaluated(currentGroupingKey, currentBuffer)
    -        }
    +  protected val groupingProjection: UnsafeProjection =
    +    UnsafeProjection.create(groupingExpressions, inputAttributes)
    +  protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
     
    -      // Final-only, Complete-only and Final-Complete: every output row contains values representing
    -      // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val bufferSchemata =
    -          allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        // TODO: Use unsafe row.
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          newMutableProjection(
    -            resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +  // Initializing the function used to generate the output row.
    +  protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
    +    val joinedRow = new JoinedRow
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +    if (modes.contains(Final) || modes.contains(Complete)) {
    --- End diff --
    
    Added a comment in the begging of AggregationIterator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164251993
  
    @yhuai I think having the two clearly separated paths (this PR) is an improvement of the current situation. I also admit that I am responsible for introducing the second path. Your comment on having four aggregate steps without the exhange triggered me, and I was thinking out loud on how we could do this using the rewriting rule (the removal of one of the paths would have been a bonus).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163726125
  
    **[Test build #47533 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47533/consoleFull)** for PR 10228 at commit [`3f60962`](https://github.com/apache/spark/commit/3f60962c2fd2f8f140714d0010dd0bb424b034b0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164255399
  
    **[Test build #47622 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47622/consoleFull)** for PR 10228 at commit [`51ca055`](https://github.com/apache/spark/commit/51ca0553c1b4f3307512954858741ad89cea89f6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164070492
  
    **[Test build #47601 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47601/consoleFull)** for PR 10228 at commit [`71e0b1c`](https://github.com/apache/spark/commit/71e0b1c29ec50c6ce6590dc43997e66f1b4011ac).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163720246
  
    **[Test build #47533 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47533/consoleFull)** for PR 10228 at commit [`3f60962`](https://github.com/apache/spark/commit/3f60962c2fd2f8f140714d0010dd0bb424b034b0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163783929
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163844335
  
    **[Test build #47570 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47570/consoleFull)** for PR 10228 at commit [`740e725`](https://github.com/apache/spark/commit/740e7258f384931ae0856ac96779790617aed914).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10228


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163747496
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163783889
  
    **[Test build #47544 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47544/consoleFull)** for PR 10228 at commit [`3f1ea7f`](https://github.com/apache/spark/commit/3f1ea7f86ec834e5a32af58de74f919b354f0d61).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164089028
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47601/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163771976
  
    @hvanhovell If we could figure out a better solution, it's definitely welcomed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163726187
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163365216
  
    oh, just realized that the plan for a query like `SELECT COUNT(DISTINCT a) FROM table` will be
    ```
    AGG-2 (count distinct)
      Shuffle to a single reducer
        AGG-1 (grouping on a)
          Shuffle by a
            Partial-AGG-1 (grouping on a)
    ```
    Ideally, we should still use four aggregate operators like the one shown below but without the overhead of using Expand.
    ```
    AGG-2 (count distinct)
      Shuffle to a single reducer
        Partial-AGG-2 (count distinct)
          AGG-1 (grouping on a)
            Shuffle by a
              Partial-AGG-1 (grouping on a)
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163819515
  
    **[Test build #47559 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47559/consoleFull)** for PR 10228 at commit [`8262ad8`](https://github.com/apache/spark/commit/8262ad89eb49da0c36b76c8089d92d9ce5b45c52).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163846015
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164264212
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47622/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163764840
  
    @davies don't get me wrong. I think this PR is an improvement of the current situation (it never crossed my mind to change partitioning when I was working on that part of the code), and should be added.
    
    I am also not to keen on changing the MultipleDistinctRewriter; given the time it'll take and the objections you've raised. The only thing that bugs me is, is that we currently rewrite distinct aggregates in two places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163819535
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47559/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164070674
  
    **[Test build #2212 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2212/consoleFull)** for PR 10228 at commit [`71e0b1c`](https://github.com/apache/spark/commit/71e0b1c29ec50c6ce6590dc43997e66f1b4011ac).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47277687
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -326,73 +192,61 @@ abstract class AggregationIterator(
         } else {
           safeOutputRow
         }
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    -        // support generic getter), we create a mutable projection to output the
    -        // JoinedRow(currentGroupingKey, currentBuffer)
    -        val bufferSchema = nonCompleteAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val resultProjection =
    -          newMutableProjection(
    -            groupingKeyAttributes ++ bufferSchema,
    -            groupingKeyAttributes ++ bufferSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    -          // rowToBeEvaluated(currentGroupingKey, currentBuffer)
    -        }
    -
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    if (!modes.contains(Final) && !modes.contains(Complete)) {
    +      // Because we cannot copy a joinedRow containing a UnsafeRow (UnsafeRow does not
    +      // support generic getter), we create a mutable projection to output the
    +      // JoinedRow(currentGroupingKey, currentBuffer)
    +      val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +      val resultProjection =
    +        newMutableProjection(
    +          groupingKeyAttributes ++ bufferSchema,
    +          groupingKeyAttributes ++ bufferSchema)()
    +      resultProjection.target(mutableOutput)
    +      (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    +        resultProjection(rowToBeEvaluated(currentGroupingKey, currentBuffer))
    +      }
    +    } else if (aggregateExpressions.nonEmpty) {
           // Final-only, Complete-only and Final-Complete: every output row contains values representing
           // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val bufferSchemata =
    -          allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        // TODO: Use unsafe row.
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          newMutableProjection(
    -            resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    -        resultProjection.target(mutableOutput)
    -
    -        (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +      val bufferSchemata = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +      val evalExpressions = aggregateFunctions.map {
    +        case ae: DeclarativeAggregate => ae.evaluateExpression
    +        case agg: AggregateFunction => NoOp
    +      }
    +      val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)()
    +      val aggregateResultSchema = aggregateAttributes
    +      // TODO: Use unsafe row.
    +      val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    +      expressionAggEvalProjection.target(aggregateResult)
    +
    +      val resultProjection =
    +        newMutableProjection(
    +          resultExpressions, groupingKeyAttributes ++ aggregateResultSchema)()
    +      resultProjection.target(mutableOutput)
    +
    +      (currentGroupingKey: InternalRow, currentBuffer: MutableRow) => {
    +        // Generate results for all expression-based aggregate functions.
    +        expressionAggEvalProjection(currentBuffer)
    +        // Generate results for all imperative aggregate functions.
    +        var i = 0
    +        while (i < allImperativeAggregateFunctions.length) {
    +          aggregateResult.update(
    +            allImperativeAggregateFunctionPositions(i),
    +            allImperativeAggregateFunctions(i).eval(currentBuffer))
    +          i += 1
             }
    -
    +        resultProjection(rowToBeEvaluated(currentGroupingKey, aggregateResult))
    +      }
    +    } else {
           // Grouping-only: we only output values of grouping expressions.
    --- End diff --
    
    Should this projection be necesary? Is this not just currentGroupingKey?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164069910
  
    **[Test build #2211 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2211/consoleFull)** for PR 10228 at commit [`740e725`](https://github.com/apache/spark/commit/740e7258f384931ae0856ac96779790617aed914).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164085613
  
    **[Test build #2211 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2211/consoleFull)** for PR 10228 at commit [`740e725`](https://github.com/apache/spark/commit/740e7258f384931ae0856ac96779790617aed914).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164219062
  
    @hvanhovell With this change, we will use the planner rule to handle single distinct aggregation and use the rewriter to handle multiple distinct aggregations, which is the same as when you originally introduced the rewriter. I guess this workflow is better than having two logics that handle the same case. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163846016
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47570/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163699118
  
    We could move the planning of a distinct queries entirely to the DistinctAggregateRewriter. This would require us to merge the non-distinct aggregate paths and the first distinct group aggregate path, so we could avoid the expand in case of a single disinct column group.
    
    This is quite a bit of work; I don't know if this is worth the effort.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47276008
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -165,155 +134,52 @@ abstract class AggregationIterator(
     
       // Initializing functions used to process a row.
       protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        // The first initialInputBufferOffset values of the input aggregation buffer is
    -        // for grouping expressions and distinct columns.
    -        val groupingAttributesAndDistinctColumns = valueAttributes.take(initialInputBufferOffset)
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -
    -        val mergeInputSchema =
    -          aggregationBufferSchema ++
    -            groupingAttributesAndDistinctColumns ++
    -            nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalExpressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, mergeInputSchema)()
    -
    -        val updateExpressions =
    -          finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          val input = rowToBeProcessed(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffers.
    -          finalExpressionAggMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions =
    -          completeAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.updateExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    +    val joinedRow = new JoinedRow
    +    val aggregationBufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    if (aggregateExpressions.nonEmpty) {
    +      val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    --- End diff --
    
    Why doesn't this just check groupingKeyAttributes.nonEmpty?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163724932
  
    **[Test build #47536 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47536/consoleFull)** for PR 10228 at commit [`a9eae30`](https://github.com/apache/spark/commit/a9eae303166d6c3ba1f80a22265482b9f4d0a525).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163819533
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47440002
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---
    @@ -97,378 +89,62 @@ class TungstenAggregationIterator(
         numOutputRows: LongSQLMetric,
         dataSize: LongSQLMetric,
         spillSize: LongSQLMetric)
    -  extends Iterator[UnsafeRow] with Logging {
    +  extends AggregationIterator(
    +    groupingExpressions,
    +    originalInputAttributes,
    +    aggregateExpressions,
    +    aggregateAttributes,
    +    initialInputBufferOffset,
    +    resultExpressions,
    +    newMutableProjection) with Logging {
     
       ///////////////////////////////////////////////////////////////////////////
       // Part 1: Initializing aggregate functions.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // A Seq containing all AggregateExpressions.
    -  // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final
    -  // are at the beginning of the allAggregateExpressions.
    -  private[this] val allAggregateExpressions: Seq[AggregateExpression] =
    -    nonCompleteAggregateExpressions ++ completeAggregateExpressions
    -
    -  // Check to make sure we do not have more than three modes in our AggregateExpressions.
    -  // If we have, users are hitting a bug and we throw an IllegalStateException.
    -  if (allAggregateExpressions.map(_.mode).distinct.length > 2) {
    -    throw new IllegalStateException(
    -      s"$allAggregateExpressions should have no more than 2 kinds of modes.")
    -  }
    -
       // Remember spill data size of this task before execute this operator so that we can
       // figure out how many bytes we spilled for this operator.
       private val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
     
    -  //
    -  // The modes of AggregateExpressions. Right now, we can handle the following mode:
    -  //  - Partial-only:
    -  //      All AggregateExpressions have the mode of Partial.
    -  //      For this case, aggregationMode is (Some(Partial), None).
    -  //  - PartialMerge-only:
    -  //      All AggregateExpressions have the mode of PartialMerge).
    -  //      For this case, aggregationMode is (Some(PartialMerge), None).
    -  //  - Final-only:
    -  //      All AggregateExpressions have the mode of Final.
    -  //      For this case, aggregationMode is (Some(Final), None).
    -  //  - Final-Complete:
    -  //      Some AggregateExpressions have the mode of Final and
    -  //      others have the mode of Complete. For this case,
    -  //      aggregationMode is (Some(Final), Some(Complete)).
    -  //  - Complete-only:
    -  //      nonCompleteAggregateExpressions is empty and we have AggregateExpressions
    -  //      with mode Complete in completeAggregateExpressions. For this case,
    -  //      aggregationMode is (None, Some(Complete)).
    -  //  - Grouping-only:
    -  //      There is no AggregateExpression. For this case, AggregationMode is (None,None).
    -  //
    -  private[this] var aggregationMode: (Option[AggregateMode], Option[AggregateMode]) = {
    -    nonCompleteAggregateExpressions.map(_.mode).distinct.headOption ->
    -      completeAggregateExpressions.map(_.mode).distinct.headOption
    -  }
    -
    -  // Initialize all AggregateFunctions by binding references, if necessary,
    -  // and setting inputBufferOffset and mutableBufferOffset.
    -  private def initializeAllAggregateFunctions(
    -      startingInputBufferOffset: Int): Array[AggregateFunction] = {
    -    var mutableBufferOffset = 0
    -    var inputBufferOffset: Int = startingInputBufferOffset
    -    val functions = new Array[AggregateFunction](allAggregateExpressions.length)
    -    var i = 0
    -    while (i < allAggregateExpressions.length) {
    -      val func = allAggregateExpressions(i).aggregateFunction
    -      val aggregateExpressionIsNonComplete = i < nonCompleteAggregateExpressions.length
    -      // We need to use this mode instead of func.mode in order to handle aggregation mode switching
    -      // when switching to sort-based aggregation:
    -      val mode = if (aggregateExpressionIsNonComplete) aggregationMode._1 else aggregationMode._2
    -      val funcWithBoundReferences = mode match {
    -        case Some(Partial) | Some(Complete) if func.isInstanceOf[ImperativeAggregate] =>
    -          // We need to create BoundReferences if the function is not an
    -          // expression-based aggregate function (it does not support code-gen) and the mode of
    -          // this function is Partial or Complete because we will call eval of this
    -          // function's children in the update method of this aggregate function.
    -          // Those eval calls require BoundReferences to work.
    -          BindReferences.bindReference(func, originalInputAttributes)
    -        case _ =>
    -          // We only need to set inputBufferOffset for aggregate functions with mode
    -          // PartialMerge and Final.
    -          val updatedFunc = func match {
    -            case function: ImperativeAggregate =>
    -              function.withNewInputAggBufferOffset(inputBufferOffset)
    -            case function => function
    -          }
    -          inputBufferOffset += func.aggBufferSchema.length
    -          updatedFunc
    -      }
    -      val funcWithUpdatedAggBufferOffset = funcWithBoundReferences match {
    -        case function: ImperativeAggregate =>
    -          // Set mutableBufferOffset for this function. It is important that setting
    -          // mutableBufferOffset happens after all potential bindReference operations
    -          // because bindReference will create a new instance of the function.
    -          function.withNewMutableAggBufferOffset(mutableBufferOffset)
    -        case function => function
    -      }
    -      mutableBufferOffset += funcWithUpdatedAggBufferOffset.aggBufferSchema.length
    -      functions(i) = funcWithUpdatedAggBufferOffset
    -      i += 1
    -    }
    -    functions
    -  }
    -
    -  private[this] var allAggregateFunctions: Array[AggregateFunction] =
    -    initializeAllAggregateFunctions(initialInputBufferOffset)
    -
    -  // Positions of those imperative aggregate functions in allAggregateFunctions.
    -  // For example, say that we have func1, func2, func3, func4 in aggregateFunctions, and
    -  // func2 and func3 are imperative aggregate functions. Then
    -  // allImperativeAggregateFunctionPositions will be [1, 2]. Note that this does not need to be
    -  // updated when falling back to sort-based aggregation because the positions of the aggregate
    -  // functions do not change in that case.
    -  private[this] val allImperativeAggregateFunctionPositions: Array[Int] = {
    -    val positions = new ArrayBuffer[Int]()
    -    var i = 0
    -    while (i < allAggregateFunctions.length) {
    -      allAggregateFunctions(i) match {
    -        case agg: DeclarativeAggregate =>
    -        case _ => positions += i
    -      }
    -      i += 1
    -    }
    -    positions.toArray
    -  }
    -
       ///////////////////////////////////////////////////////////////////////////
       // Part 2: Methods and fields used by setting aggregation buffer values,
       //         processing input rows from inputIter, and generating output
       //         rows.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // The projection used to initialize buffer values for all expression-based aggregates.
    -  // Note that this projection does not need to be updated when switching to sort-based aggregation
    -  // because the schema of empty aggregation buffers does not change in that case.
    -  private[this] val expressionAggInitialProjection: MutableProjection = {
    -    val initExpressions = allAggregateFunctions.flatMap {
    -      case ae: DeclarativeAggregate => ae.initialValues
    -      // For the positions corresponding to imperative aggregate functions, we'll use special
    -      // no-op expressions which are ignored during projection code-generation.
    -      case i: ImperativeAggregate => Seq.fill(i.aggBufferAttributes.length)(NoOp)
    -    }
    -    newMutableProjection(initExpressions, Nil)()
    -  }
    -
       // Creates a new aggregation buffer and initializes buffer values.
    -  // This function should be only called at most three times (when we create the hash map,
    -  // when we switch to sort-based aggregation, and when we create the re-used buffer for
    -  // sort-based aggregation).
    +  // This function should be only called at most two times (when we create the hash map,
    +  // and when we create the re-used buffer for sort-based aggregation).
       private def createNewAggregationBuffer(): UnsafeRow = {
    -    val bufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    +    val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
         val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
           .apply(new GenericMutableRow(bufferSchema.length))
         // Initialize declarative aggregates' buffer values
         expressionAggInitialProjection.target(buffer)(EmptyRow)
         // Initialize imperative aggregates' buffer values
    -    allAggregateFunctions.collect { case f: ImperativeAggregate => f }.foreach(_.initialize(buffer))
    +    aggregateFunctions.collect { case f: ImperativeAggregate => f }.foreach(_.initialize(buffer))
         buffer
       }
     
    -  // Creates a function used to process a row based on the given inputAttributes.
    -  private def generateProcessRow(
    -      inputAttributes: Seq[Attribute]): (UnsafeRow, InternalRow) => Unit = {
    -
    -    val aggregationBufferAttributes = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    val joinedRow = new JoinedRow()
    -
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = allAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val imperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(joinedRow(currentBuffer, row))
    -          // Process all imperative aggregate functions
    -          var i = 0
    -          while (i < imperativeAggregateFunctions.length) {
    -            imperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val mergeExpressions = allAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val imperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < imperativeAggregateFunctions.length) {
    -            imperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -        val nonCompleteAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.take(nonCompleteAggregateExpressions.length)
    -        val nonCompleteImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          nonCompleteAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalMergeProjection =
    -          newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val completeUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          val input = joinedRow(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffer values in row to
    -          // currentBuffer.
    -          finalMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions = completeAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Grouping only.
    -      case (None, None) => (currentBuffer: UnsafeRow, row: InternalRow) => {}
    -
    -      case other =>
    -        throw new IllegalStateException(
    -          s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
    -    }
    -  }
    -
       // Creates a function used to generate output rows.
    -  private def generateResultProjection(): (UnsafeRow, UnsafeRow) => UnsafeRow = {
    -
    -    val groupingAttributes = groupingExpressions.map(_.toAttribute)
    -    val bufferAttributes = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    -        val bufferSchema = StructType.fromAttributes(bufferAttributes)
    -        val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          unsafeRowJoiner.join(currentGroupingKey, currentBuffer)
    -        }
    -
    -      // Final-only, Complete-only and Final-Complete: a output row is generated based on
    -      // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val joinedRow = new JoinedRow()
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)()
    -        // These are the attributes of the row produced by `expressionAggEvalProjection`
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateResultSchema)
    -
    -        val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(joinedRow(currentGroupingKey, aggregateResult))
    -        }
    -
    -      // Grouping-only: a output row is generated from values of grouping expressions.
    -      case (None, None) =>
    -        val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          resultProjection(currentGroupingKey)
    -        }
    -
    -      case other =>
    -        throw new IllegalStateException(
    -          s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
    +  override def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) {
    --- End diff --
    
    Add a check in case there is any invalid mode?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164089106
  
    **[Test build #2212 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2212/consoleFull)** for PR 10228 at commit [`71e0b1c`](https://github.com/apache/spark/commit/71e0b1c29ec50c6ce6590dc43997e66f1b4011ac).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163726189
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47533/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164263964
  
    **[Test build #47622 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47622/consoleFull)** for PR 10228 at commit [`51ca055`](https://github.com/apache/spark/commit/51ca0553c1b4f3307512954858741ad89cea89f6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163364158
  
    **[Test build #47443 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47443/consoleFull)** for PR 10228 at commit [`5e42c76`](https://github.com/apache/spark/commit/5e42c7620b2f56349c0627d3da2f8407553b74cf).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-164284205
  
    @hvanhovell Yea, it's great to think about how to use a single rule to handle aggregation queries with distinct after we have this improvement. The logical rewriter rules probably is a good place because rewriting logical plans is easier. If it is the right approach, we can make some changes to our physical planner to make it respect the aggregation mode of an agg expression in a logical agg operator (right now, our physical planner always ignore the mode). So, when we create physical plan, we can understand that, for example, a logical agg operator is used to merge aggregation buffers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47444202
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---
    @@ -165,237 +137,100 @@ abstract class AggregationIterator(
     
       // Initializing functions used to process a row.
       protected val processRow: (MutableRow, InternalRow) => Unit = {
    -    val rowToBeProcessed = new JoinedRow
    -    val aggregationBufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val inputAggregationBufferSchema = if (initialInputBufferOffset == 0) {
    -          // If initialInputBufferOffset, the input value does not contain
    -          // grouping keys.
    -          // This part is pretty hacky.
    -          allAggregateFunctions.flatMap(_.inputAggBufferAttributes).toSeq
    -        } else {
    -          groupingKeyAttributes ++ allAggregateFunctions.flatMap(_.inputAggBufferAttributes)
    -        }
    -        // val inputAggregationBufferSchema =
    -        //  groupingKeyAttributes ++
    -        //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
    -        val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(
    -            mergeExpressions,
    -            aggregationBufferSchema ++ inputAggregationBufferSchema)()
    -
    -        (currentBuffer: MutableRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(rowToBeProcessed(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    +    val joinedRow = new JoinedRow
    +    if (aggregateExpressions.nonEmpty) {
    +      val mergeExpressions = aggregateFunctions.zipWithIndex.flatMap {
    --- End diff --
    
    ```zip(aggregateExpressions)``` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47440000
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---
    @@ -97,378 +89,62 @@ class TungstenAggregationIterator(
         numOutputRows: LongSQLMetric,
         dataSize: LongSQLMetric,
         spillSize: LongSQLMetric)
    -  extends Iterator[UnsafeRow] with Logging {
    +  extends AggregationIterator(
    +    groupingExpressions,
    +    originalInputAttributes,
    +    aggregateExpressions,
    +    aggregateAttributes,
    +    initialInputBufferOffset,
    +    resultExpressions,
    +    newMutableProjection) with Logging {
     
       ///////////////////////////////////////////////////////////////////////////
       // Part 1: Initializing aggregate functions.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // A Seq containing all AggregateExpressions.
    -  // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final
    -  // are at the beginning of the allAggregateExpressions.
    -  private[this] val allAggregateExpressions: Seq[AggregateExpression] =
    -    nonCompleteAggregateExpressions ++ completeAggregateExpressions
    -
    -  // Check to make sure we do not have more than three modes in our AggregateExpressions.
    -  // If we have, users are hitting a bug and we throw an IllegalStateException.
    -  if (allAggregateExpressions.map(_.mode).distinct.length > 2) {
    -    throw new IllegalStateException(
    -      s"$allAggregateExpressions should have no more than 2 kinds of modes.")
    -  }
    -
       // Remember spill data size of this task before execute this operator so that we can
       // figure out how many bytes we spilled for this operator.
       private val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
     
    -  //
    -  // The modes of AggregateExpressions. Right now, we can handle the following mode:
    -  //  - Partial-only:
    -  //      All AggregateExpressions have the mode of Partial.
    -  //      For this case, aggregationMode is (Some(Partial), None).
    -  //  - PartialMerge-only:
    -  //      All AggregateExpressions have the mode of PartialMerge).
    -  //      For this case, aggregationMode is (Some(PartialMerge), None).
    -  //  - Final-only:
    -  //      All AggregateExpressions have the mode of Final.
    -  //      For this case, aggregationMode is (Some(Final), None).
    -  //  - Final-Complete:
    -  //      Some AggregateExpressions have the mode of Final and
    -  //      others have the mode of Complete. For this case,
    -  //      aggregationMode is (Some(Final), Some(Complete)).
    -  //  - Complete-only:
    -  //      nonCompleteAggregateExpressions is empty and we have AggregateExpressions
    -  //      with mode Complete in completeAggregateExpressions. For this case,
    -  //      aggregationMode is (None, Some(Complete)).
    -  //  - Grouping-only:
    -  //      There is no AggregateExpression. For this case, AggregationMode is (None,None).
    -  //
    -  private[this] var aggregationMode: (Option[AggregateMode], Option[AggregateMode]) = {
    -    nonCompleteAggregateExpressions.map(_.mode).distinct.headOption ->
    -      completeAggregateExpressions.map(_.mode).distinct.headOption
    -  }
    -
    -  // Initialize all AggregateFunctions by binding references, if necessary,
    -  // and setting inputBufferOffset and mutableBufferOffset.
    -  private def initializeAllAggregateFunctions(
    -      startingInputBufferOffset: Int): Array[AggregateFunction] = {
    -    var mutableBufferOffset = 0
    -    var inputBufferOffset: Int = startingInputBufferOffset
    -    val functions = new Array[AggregateFunction](allAggregateExpressions.length)
    -    var i = 0
    -    while (i < allAggregateExpressions.length) {
    -      val func = allAggregateExpressions(i).aggregateFunction
    -      val aggregateExpressionIsNonComplete = i < nonCompleteAggregateExpressions.length
    -      // We need to use this mode instead of func.mode in order to handle aggregation mode switching
    -      // when switching to sort-based aggregation:
    -      val mode = if (aggregateExpressionIsNonComplete) aggregationMode._1 else aggregationMode._2
    -      val funcWithBoundReferences = mode match {
    -        case Some(Partial) | Some(Complete) if func.isInstanceOf[ImperativeAggregate] =>
    -          // We need to create BoundReferences if the function is not an
    -          // expression-based aggregate function (it does not support code-gen) and the mode of
    -          // this function is Partial or Complete because we will call eval of this
    -          // function's children in the update method of this aggregate function.
    -          // Those eval calls require BoundReferences to work.
    -          BindReferences.bindReference(func, originalInputAttributes)
    -        case _ =>
    -          // We only need to set inputBufferOffset for aggregate functions with mode
    -          // PartialMerge and Final.
    -          val updatedFunc = func match {
    -            case function: ImperativeAggregate =>
    -              function.withNewInputAggBufferOffset(inputBufferOffset)
    -            case function => function
    -          }
    -          inputBufferOffset += func.aggBufferSchema.length
    -          updatedFunc
    -      }
    -      val funcWithUpdatedAggBufferOffset = funcWithBoundReferences match {
    -        case function: ImperativeAggregate =>
    -          // Set mutableBufferOffset for this function. It is important that setting
    -          // mutableBufferOffset happens after all potential bindReference operations
    -          // because bindReference will create a new instance of the function.
    -          function.withNewMutableAggBufferOffset(mutableBufferOffset)
    -        case function => function
    -      }
    -      mutableBufferOffset += funcWithUpdatedAggBufferOffset.aggBufferSchema.length
    -      functions(i) = funcWithUpdatedAggBufferOffset
    -      i += 1
    -    }
    -    functions
    -  }
    -
    -  private[this] var allAggregateFunctions: Array[AggregateFunction] =
    -    initializeAllAggregateFunctions(initialInputBufferOffset)
    -
    -  // Positions of those imperative aggregate functions in allAggregateFunctions.
    -  // For example, say that we have func1, func2, func3, func4 in aggregateFunctions, and
    -  // func2 and func3 are imperative aggregate functions. Then
    -  // allImperativeAggregateFunctionPositions will be [1, 2]. Note that this does not need to be
    -  // updated when falling back to sort-based aggregation because the positions of the aggregate
    -  // functions do not change in that case.
    -  private[this] val allImperativeAggregateFunctionPositions: Array[Int] = {
    -    val positions = new ArrayBuffer[Int]()
    -    var i = 0
    -    while (i < allAggregateFunctions.length) {
    -      allAggregateFunctions(i) match {
    -        case agg: DeclarativeAggregate =>
    -        case _ => positions += i
    -      }
    -      i += 1
    -    }
    -    positions.toArray
    -  }
    -
       ///////////////////////////////////////////////////////////////////////////
       // Part 2: Methods and fields used by setting aggregation buffer values,
       //         processing input rows from inputIter, and generating output
       //         rows.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // The projection used to initialize buffer values for all expression-based aggregates.
    -  // Note that this projection does not need to be updated when switching to sort-based aggregation
    -  // because the schema of empty aggregation buffers does not change in that case.
    -  private[this] val expressionAggInitialProjection: MutableProjection = {
    -    val initExpressions = allAggregateFunctions.flatMap {
    -      case ae: DeclarativeAggregate => ae.initialValues
    -      // For the positions corresponding to imperative aggregate functions, we'll use special
    -      // no-op expressions which are ignored during projection code-generation.
    -      case i: ImperativeAggregate => Seq.fill(i.aggBufferAttributes.length)(NoOp)
    -    }
    -    newMutableProjection(initExpressions, Nil)()
    -  }
    -
       // Creates a new aggregation buffer and initializes buffer values.
    -  // This function should be only called at most three times (when we create the hash map,
    -  // when we switch to sort-based aggregation, and when we create the re-used buffer for
    -  // sort-based aggregation).
    +  // This function should be only called at most two times (when we create the hash map,
    +  // and when we create the re-used buffer for sort-based aggregation).
       private def createNewAggregationBuffer(): UnsafeRow = {
    -    val bufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    +    val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
         val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
           .apply(new GenericMutableRow(bufferSchema.length))
         // Initialize declarative aggregates' buffer values
         expressionAggInitialProjection.target(buffer)(EmptyRow)
         // Initialize imperative aggregates' buffer values
    -    allAggregateFunctions.collect { case f: ImperativeAggregate => f }.foreach(_.initialize(buffer))
    +    aggregateFunctions.collect { case f: ImperativeAggregate => f }.foreach(_.initialize(buffer))
         buffer
       }
     
    -  // Creates a function used to process a row based on the given inputAttributes.
    -  private def generateProcessRow(
    -      inputAttributes: Seq[Attribute]): (UnsafeRow, InternalRow) => Unit = {
    -
    -    val aggregationBufferAttributes = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    val joinedRow = new JoinedRow()
    -
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = allAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val imperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(joinedRow(currentBuffer, row))
    -          // Process all imperative aggregate functions
    -          var i = 0
    -          while (i < imperativeAggregateFunctions.length) {
    -            imperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val mergeExpressions = allAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val imperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < imperativeAggregateFunctions.length) {
    -            imperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -        val nonCompleteAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.take(nonCompleteAggregateExpressions.length)
    -        val nonCompleteImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          nonCompleteAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalMergeProjection =
    -          newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val completeUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          val input = joinedRow(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffer values in row to
    -          // currentBuffer.
    -          finalMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions = completeAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Grouping only.
    -      case (None, None) => (currentBuffer: UnsafeRow, row: InternalRow) => {}
    -
    -      case other =>
    -        throw new IllegalStateException(
    -          s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
    -    }
    -  }
    -
       // Creates a function used to generate output rows.
    -  private def generateResultProjection(): (UnsafeRow, UnsafeRow) => UnsafeRow = {
    -
    -    val groupingAttributes = groupingExpressions.map(_.toAttribute)
    -    val bufferAttributes = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    -        val bufferSchema = StructType.fromAttributes(bufferAttributes)
    -        val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          unsafeRowJoiner.join(currentGroupingKey, currentBuffer)
    -        }
    -
    -      // Final-only, Complete-only and Final-Complete: a output row is generated based on
    -      // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val joinedRow = new JoinedRow()
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)()
    -        // These are the attributes of the row produced by `expressionAggEvalProjection`
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateResultSchema)
    -
    -        val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(joinedRow(currentGroupingKey, aggregateResult))
    -        }
    -
    -      // Grouping-only: a output row is generated from values of grouping expressions.
    -      case (None, None) =>
    -        val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          resultProjection(currentGroupingKey)
    -        }
    -
    -      case other =>
    -        throw new IllegalStateException(
    -          s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
    +  override def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
    --- End diff --
    
    `override protected`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10228#discussion_r47440003
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---
    @@ -97,378 +89,62 @@ class TungstenAggregationIterator(
         numOutputRows: LongSQLMetric,
         dataSize: LongSQLMetric,
         spillSize: LongSQLMetric)
    -  extends Iterator[UnsafeRow] with Logging {
    +  extends AggregationIterator(
    +    groupingExpressions,
    +    originalInputAttributes,
    +    aggregateExpressions,
    +    aggregateAttributes,
    +    initialInputBufferOffset,
    +    resultExpressions,
    +    newMutableProjection) with Logging {
     
       ///////////////////////////////////////////////////////////////////////////
       // Part 1: Initializing aggregate functions.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // A Seq containing all AggregateExpressions.
    -  // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final
    -  // are at the beginning of the allAggregateExpressions.
    -  private[this] val allAggregateExpressions: Seq[AggregateExpression] =
    -    nonCompleteAggregateExpressions ++ completeAggregateExpressions
    -
    -  // Check to make sure we do not have more than three modes in our AggregateExpressions.
    -  // If we have, users are hitting a bug and we throw an IllegalStateException.
    -  if (allAggregateExpressions.map(_.mode).distinct.length > 2) {
    -    throw new IllegalStateException(
    -      s"$allAggregateExpressions should have no more than 2 kinds of modes.")
    -  }
    -
       // Remember spill data size of this task before execute this operator so that we can
       // figure out how many bytes we spilled for this operator.
       private val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
     
    -  //
    -  // The modes of AggregateExpressions. Right now, we can handle the following mode:
    -  //  - Partial-only:
    -  //      All AggregateExpressions have the mode of Partial.
    -  //      For this case, aggregationMode is (Some(Partial), None).
    -  //  - PartialMerge-only:
    -  //      All AggregateExpressions have the mode of PartialMerge).
    -  //      For this case, aggregationMode is (Some(PartialMerge), None).
    -  //  - Final-only:
    -  //      All AggregateExpressions have the mode of Final.
    -  //      For this case, aggregationMode is (Some(Final), None).
    -  //  - Final-Complete:
    -  //      Some AggregateExpressions have the mode of Final and
    -  //      others have the mode of Complete. For this case,
    -  //      aggregationMode is (Some(Final), Some(Complete)).
    -  //  - Complete-only:
    -  //      nonCompleteAggregateExpressions is empty and we have AggregateExpressions
    -  //      with mode Complete in completeAggregateExpressions. For this case,
    -  //      aggregationMode is (None, Some(Complete)).
    -  //  - Grouping-only:
    -  //      There is no AggregateExpression. For this case, AggregationMode is (None,None).
    -  //
    -  private[this] var aggregationMode: (Option[AggregateMode], Option[AggregateMode]) = {
    -    nonCompleteAggregateExpressions.map(_.mode).distinct.headOption ->
    -      completeAggregateExpressions.map(_.mode).distinct.headOption
    -  }
    -
    -  // Initialize all AggregateFunctions by binding references, if necessary,
    -  // and setting inputBufferOffset and mutableBufferOffset.
    -  private def initializeAllAggregateFunctions(
    -      startingInputBufferOffset: Int): Array[AggregateFunction] = {
    -    var mutableBufferOffset = 0
    -    var inputBufferOffset: Int = startingInputBufferOffset
    -    val functions = new Array[AggregateFunction](allAggregateExpressions.length)
    -    var i = 0
    -    while (i < allAggregateExpressions.length) {
    -      val func = allAggregateExpressions(i).aggregateFunction
    -      val aggregateExpressionIsNonComplete = i < nonCompleteAggregateExpressions.length
    -      // We need to use this mode instead of func.mode in order to handle aggregation mode switching
    -      // when switching to sort-based aggregation:
    -      val mode = if (aggregateExpressionIsNonComplete) aggregationMode._1 else aggregationMode._2
    -      val funcWithBoundReferences = mode match {
    -        case Some(Partial) | Some(Complete) if func.isInstanceOf[ImperativeAggregate] =>
    -          // We need to create BoundReferences if the function is not an
    -          // expression-based aggregate function (it does not support code-gen) and the mode of
    -          // this function is Partial or Complete because we will call eval of this
    -          // function's children in the update method of this aggregate function.
    -          // Those eval calls require BoundReferences to work.
    -          BindReferences.bindReference(func, originalInputAttributes)
    -        case _ =>
    -          // We only need to set inputBufferOffset for aggregate functions with mode
    -          // PartialMerge and Final.
    -          val updatedFunc = func match {
    -            case function: ImperativeAggregate =>
    -              function.withNewInputAggBufferOffset(inputBufferOffset)
    -            case function => function
    -          }
    -          inputBufferOffset += func.aggBufferSchema.length
    -          updatedFunc
    -      }
    -      val funcWithUpdatedAggBufferOffset = funcWithBoundReferences match {
    -        case function: ImperativeAggregate =>
    -          // Set mutableBufferOffset for this function. It is important that setting
    -          // mutableBufferOffset happens after all potential bindReference operations
    -          // because bindReference will create a new instance of the function.
    -          function.withNewMutableAggBufferOffset(mutableBufferOffset)
    -        case function => function
    -      }
    -      mutableBufferOffset += funcWithUpdatedAggBufferOffset.aggBufferSchema.length
    -      functions(i) = funcWithUpdatedAggBufferOffset
    -      i += 1
    -    }
    -    functions
    -  }
    -
    -  private[this] var allAggregateFunctions: Array[AggregateFunction] =
    -    initializeAllAggregateFunctions(initialInputBufferOffset)
    -
    -  // Positions of those imperative aggregate functions in allAggregateFunctions.
    -  // For example, say that we have func1, func2, func3, func4 in aggregateFunctions, and
    -  // func2 and func3 are imperative aggregate functions. Then
    -  // allImperativeAggregateFunctionPositions will be [1, 2]. Note that this does not need to be
    -  // updated when falling back to sort-based aggregation because the positions of the aggregate
    -  // functions do not change in that case.
    -  private[this] val allImperativeAggregateFunctionPositions: Array[Int] = {
    -    val positions = new ArrayBuffer[Int]()
    -    var i = 0
    -    while (i < allAggregateFunctions.length) {
    -      allAggregateFunctions(i) match {
    -        case agg: DeclarativeAggregate =>
    -        case _ => positions += i
    -      }
    -      i += 1
    -    }
    -    positions.toArray
    -  }
    -
       ///////////////////////////////////////////////////////////////////////////
       // Part 2: Methods and fields used by setting aggregation buffer values,
       //         processing input rows from inputIter, and generating output
       //         rows.
       ///////////////////////////////////////////////////////////////////////////
     
    -  // The projection used to initialize buffer values for all expression-based aggregates.
    -  // Note that this projection does not need to be updated when switching to sort-based aggregation
    -  // because the schema of empty aggregation buffers does not change in that case.
    -  private[this] val expressionAggInitialProjection: MutableProjection = {
    -    val initExpressions = allAggregateFunctions.flatMap {
    -      case ae: DeclarativeAggregate => ae.initialValues
    -      // For the positions corresponding to imperative aggregate functions, we'll use special
    -      // no-op expressions which are ignored during projection code-generation.
    -      case i: ImperativeAggregate => Seq.fill(i.aggBufferAttributes.length)(NoOp)
    -    }
    -    newMutableProjection(initExpressions, Nil)()
    -  }
    -
       // Creates a new aggregation buffer and initializes buffer values.
    -  // This function should be only called at most three times (when we create the hash map,
    -  // when we switch to sort-based aggregation, and when we create the re-used buffer for
    -  // sort-based aggregation).
    +  // This function should be only called at most two times (when we create the hash map,
    +  // and when we create the re-used buffer for sort-based aggregation).
       private def createNewAggregationBuffer(): UnsafeRow = {
    -    val bufferSchema = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    +    val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
         val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
           .apply(new GenericMutableRow(bufferSchema.length))
         // Initialize declarative aggregates' buffer values
         expressionAggInitialProjection.target(buffer)(EmptyRow)
         // Initialize imperative aggregates' buffer values
    -    allAggregateFunctions.collect { case f: ImperativeAggregate => f }.foreach(_.initialize(buffer))
    +    aggregateFunctions.collect { case f: ImperativeAggregate => f }.foreach(_.initialize(buffer))
         buffer
       }
     
    -  // Creates a function used to process a row based on the given inputAttributes.
    -  private def generateProcessRow(
    -      inputAttributes: Seq[Attribute]): (UnsafeRow, InternalRow) => Unit = {
    -
    -    val aggregationBufferAttributes = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -    val joinedRow = new JoinedRow()
    -
    -    aggregationMode match {
    -      // Partial-only
    -      case (Some(Partial), None) =>
    -        val updateExpressions = allAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val imperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -        val expressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          expressionAggUpdateProjection.target(currentBuffer)
    -          // Process all expression-based aggregate functions.
    -          expressionAggUpdateProjection(joinedRow(currentBuffer, row))
    -          // Process all imperative aggregate functions
    -          var i = 0
    -          while (i < imperativeAggregateFunctions.length) {
    -            imperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // PartialMerge-only or Final-only
    -      case (Some(PartialMerge), None) | (Some(Final), None) =>
    -        val mergeExpressions = allAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.mergeExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val imperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -        // This projection is used to merge buffer values for all expression-based aggregates.
    -        val expressionAggMergeProjection =
    -          newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          // Process all expression-based aggregate functions.
    -          expressionAggMergeProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    -          // Process all imperative aggregate functions.
    -          var i = 0
    -          while (i < imperativeAggregateFunctions.length) {
    -            imperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Final-Complete
    -      case (Some(Final), Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -        val nonCompleteAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.take(nonCompleteAggregateExpressions.length)
    -        val nonCompleteImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          nonCompleteAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val completeOffsetExpressions =
    -          Seq.fill(completeAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        val mergeExpressions =
    -          nonCompleteAggregateFunctions.flatMap {
    -            case ae: DeclarativeAggregate => ae.mergeExpressions
    -            case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -          } ++ completeOffsetExpressions
    -        val finalMergeProjection =
    -          newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        // We do not touch buffer values of aggregate functions with the Final mode.
    -        val finalOffsetExpressions =
    -          Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
    -        val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val completeUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          val input = joinedRow(currentBuffer, row)
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeUpdateProjection.target(currentBuffer)(input)
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -
    -          // For all aggregate functions with mode Final, merge buffer values in row to
    -          // currentBuffer.
    -          finalMergeProjection.target(currentBuffer)(input)
    -          i = 0
    -          while (i < nonCompleteImperativeAggregateFunctions.length) {
    -            nonCompleteImperativeAggregateFunctions(i).merge(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Complete-only
    -      case (None, Some(Complete)) =>
    -        val completeAggregateFunctions: Array[AggregateFunction] =
    -          allAggregateFunctions.takeRight(completeAggregateExpressions.length)
    -        // All imperative aggregate functions with mode Complete.
    -        val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          completeAggregateFunctions.collect { case func: ImperativeAggregate => func }
    -
    -        val updateExpressions = completeAggregateFunctions.flatMap {
    -          case ae: DeclarativeAggregate => ae.updateExpressions
    -          case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp)
    -        }
    -        val completeExpressionAggUpdateProjection =
    -          newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)()
    -
    -        (currentBuffer: UnsafeRow, row: InternalRow) => {
    -          // For all aggregate functions with mode Complete, update buffers.
    -          completeExpressionAggUpdateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
    -          var i = 0
    -          while (i < completeImperativeAggregateFunctions.length) {
    -            completeImperativeAggregateFunctions(i).update(currentBuffer, row)
    -            i += 1
    -          }
    -        }
    -
    -      // Grouping only.
    -      case (None, None) => (currentBuffer: UnsafeRow, row: InternalRow) => {}
    -
    -      case other =>
    -        throw new IllegalStateException(
    -          s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
    -    }
    -  }
    -
       // Creates a function used to generate output rows.
    -  private def generateResultProjection(): (UnsafeRow, UnsafeRow) => UnsafeRow = {
    -
    -    val groupingAttributes = groupingExpressions.map(_.toAttribute)
    -    val bufferAttributes = allAggregateFunctions.flatMap(_.aggBufferAttributes)
    -
    -    aggregationMode match {
    -      // Partial-only or PartialMerge-only: every output row is basically the values of
    -      // the grouping expressions and the corresponding aggregation buffer.
    -      case (Some(Partial), None) | (Some(PartialMerge), None) =>
    -        val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    -        val bufferSchema = StructType.fromAttributes(bufferAttributes)
    -        val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          unsafeRowJoiner.join(currentGroupingKey, currentBuffer)
    -        }
    -
    -      // Final-only, Complete-only and Final-Complete: a output row is generated based on
    -      // resultExpressions.
    -      case (Some(Final), None) | (Some(Final) | None, Some(Complete)) =>
    -        val joinedRow = new JoinedRow()
    -        val evalExpressions = allAggregateFunctions.map {
    -          case ae: DeclarativeAggregate => ae.evaluateExpression
    -          case agg: AggregateFunction => NoOp
    -        }
    -        val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)()
    -        // These are the attributes of the row produced by `expressionAggEvalProjection`
    -        val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes
    -        val aggregateResult = new SpecificMutableRow(aggregateResultSchema.map(_.dataType))
    -        expressionAggEvalProjection.target(aggregateResult)
    -        val resultProjection =
    -          UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateResultSchema)
    -
    -        val allImperativeAggregateFunctions: Array[ImperativeAggregate] =
    -          allAggregateFunctions.collect { case func: ImperativeAggregate => func}
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          // Generate results for all expression-based aggregate functions.
    -          expressionAggEvalProjection(currentBuffer)
    -          // Generate results for all imperative aggregate functions.
    -          var i = 0
    -          while (i < allImperativeAggregateFunctions.length) {
    -            aggregateResult.update(
    -              allImperativeAggregateFunctionPositions(i),
    -              allImperativeAggregateFunctions(i).eval(currentBuffer))
    -            i += 1
    -          }
    -          resultProjection(joinedRow(currentGroupingKey, aggregateResult))
    -        }
    -
    -      // Grouping-only: a output row is generated from values of grouping expressions.
    -      case (None, None) =>
    -        val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
    -
    -        (currentGroupingKey: UnsafeRow, currentBuffer: UnsafeRow) => {
    -          resultProjection(currentGroupingKey)
    -        }
    -
    -      case other =>
    -        throw new IllegalStateException(
    -          s"${aggregationMode} should not be passed into TungstenAggregationIterator.")
    +  override def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
    +    val modes = aggregateExpressions.map(_.mode).distinct
    +    if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) {
    +      // Fast path for partial aggregation
    --- End diff --
    
    Add a comment to explain why it is a fast path?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12213] [SQL] use multiple partitions fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10228#issuecomment-163362585
  
    **[Test build #47442 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47442/consoleFull)** for PR 10228 at commit [`e3f2e79`](https://github.com/apache/spark/commit/e3f2e79a5e6da3fcfe7d0441dd00cb31f3a30992).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org