You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/01/21 01:29:00 UTC

[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/10855

    [SPARK-12914] [SQL] generate aggregation with grouping keys

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark gen_keys

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10855
    
----
commit 8f9447c688dee93e901c3be83313761168b65008
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-21T00:23:04Z

    generate aggregation with grouping keys

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50919040
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -34,7 +36,7 @@
       // used when there is no column in output
       protected UnsafeRow unsafeRow = new UnsafeRow(0);
     
    -  public boolean hasNext() {
    +  public boolean hasNext() throws IOException {
    --- End diff --
    
    Why'd you change this to throw exceptions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174791574
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50063/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176996073
  
    **[Test build #2479 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2479/consoleFull)** for PR 10855 at commit [`940c88d`](https://github.com/apache/spark/commit/940c88df74be1f7cc9f62aa19c17dda31266cce4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174782562
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176971256
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175474226
  
    @nongli I had pulled out the unrelated changes into #10944, we should review and merge that one first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173467214
  
    @nongli @rxin  This PR is ready for review now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173482999
  
    **[Test build #49862 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49862/consoleFull)** for PR 10855 at commit [`7880786`](https://github.com/apache/spark/commit/788078668795458aa29a55d18e2b23686992df8d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176478372
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50307/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176470765
  
    **[Test build #50307 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50307/consoleFull)** for PR 10855 at commit [`efe7fa2`](https://github.com/apache/spark/commit/efe7fa26d0ed49c09ec886185713f235c645570f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173437954
  
    **[Test build #49847 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49847/consoleFull)** for PR 10855 at commit [`a98bc05`](https://github.com/apache/spark/commit/a98bc05bf0542f5fdb8b54cb36651448832cfa8c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175479555
  
    **[Test build #50178 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50178/consoleFull)** for PR 10855 at commit [`48e125c`](https://github.com/apache/spark/commit/48e125cd7623af1af2b9f82b9ca8ddeca438ad17).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173461714
  
    **[Test build #49855 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49855/consoleFull)** for PR 10855 at commit [`7d1bd43`](https://github.com/apache/spark/commit/7d1bd43aafd7c38120b9508830e7a22db11371b4).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173440579
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176944415
  
    **[Test build #50400 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50400/consoleFull)** for PR 10855 at commit [`d3c2406`](https://github.com/apache/spark/commit/d3c240692bc802d28646e5e8a9c96471fd500952).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177031168
  
    **[Test build #50420 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50420/consoleFull)** for PR 10855 at commit [`caad24f`](https://github.com/apache/spark/commit/caad24f414fde59e8563d05eba76fa5cf448ddf6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176987981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50411/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174781705
  
    **[Test build #50060 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50060/consoleFull)** for PR 10855 at commit [`9cc7925`](https://github.com/apache/spark/commit/9cc79254c38d97fe136d181518356c650114536f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176498690
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50312/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50918676
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -137,61 +157,297 @@ case class TungstenAggregate(
         bufVars = initExpr.map { e =>
           val isNull = ctx.freshName("bufIsNull")
           val value = ctx.freshName("bufValue")
    +      ctx.addMutableState("boolean", isNull, "")
    +      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
           // The initial expression should not access any column
           val ev = e.gen(ctx)
    -      val initVars = s"""
    -         | boolean $isNull = ${ev.isNull};
    -         | ${ctx.javaType(e.dataType)} $value = ${ev.value};
    -       """.stripMargin
    +      val initVars =
    +        s"""
    +         $isNull = ${ev.isNull};
    +         $value = ${ev.value};
    +       """
           ExprCode(ev.code + initVars, isNull, value)
         }
     
    -    val (rdd, childSource) = child.asInstanceOf[CodegenSupport].produce(ctx, this)
    -    val source =
    +    // generate variables for output
    +    val (resultVars, genResult) = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // evaluate aggregate results
    +      ctx.currentVars = bufVars
    +      val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
    +      val aggResults = functions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttrs).gen(ctx)
    +      }
    +      // evaluate result expressions
    +      ctx.currentVars = aggResults
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, aggregateAttributes).gen(ctx)
    +      }
    +      (resultVars, s"""
    +         ${aggResults.map(_.code).mkString("\n")}
    +         ${resultVars.map(_.code).mkString("\n")}
    +       """)
    +    } else {
    +      // output the aggregate buffer directly
    +      (bufVars, "")
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
           s"""
    -         | if (!$initAgg) {
    -         |   $initAgg = true;
    -         |
    -         |   // initialize aggregation buffer
    -         |   ${bufVars.map(_.code).mkString("\n")}
    -         |
    -         |   $childSource
    -         |
    -         |   // output the result
    -         |   ${consume(ctx, bufVars)}
    -         | }
    -       """.stripMargin
    -
    -    (rdd, source)
    +        private void $doAgg() throws java.io.IOException {
    +          // initialize aggregation buffer
    +          ${bufVars.map(_.code).mkString("\n")}
    +
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +        }
    +       """)
    +
    +    s"""
    +    if (!$initAgg) {
    +      $initAgg = true;
    +      $doAgg();
    +
    +      // output the result
    +      $genResult
    +
    +      ${consume(ctx, resultVars)}
    +    }
    +    """
       }
     
    -  override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
    +  private def doConsumeWithoutKeys(
    +      ctx: CodegenContext,
    +      child: SparkPlan,
    +      input: Seq[ExprCode]): String = {
         // only have DeclarativeAggregate
         val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
    -    // the mode could be only Partial or PartialMerge
    -    val updateExpr = if (modes.contains(Partial)) {
    -      functions.flatMap(_.updateExpressions)
    +    val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output
    +    val updateExpr = aggregateExpressions.flatMap { e =>
    +      e.mode match {
    +        case Partial | Complete =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
    +        case PartialMerge | Final =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
    +      }
    +    }
    +    ctx.currentVars = bufVars ++ input
    +    // TODO: support subexpression elimination
    +    val updates = updateExpr.zipWithIndex.map { case (e, i) =>
    +      val ev = BindReferences.bindReference[Expression](e, inputAttrs).gen(ctx)
    +      s"""
    +       ${ev.code}
    +       ${bufVars(i).isNull} = ${ev.isNull};
    +       ${bufVars(i).value} = ${ev.value};
    +       """
    +    }
    +
    +    s"""
    +     // do aggregate and update aggregation buffer
    +     ${updates.mkString("")}
    +     """
    +  }
    +
    +  val groupingAttributes = groupingExpressions.map(_.toAttribute)
    +  val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    +  val declFunctions = aggregateExpressions.map(_.aggregateFunction)
    +    .filter(_.isInstanceOf[DeclarativeAggregate])
    +    .map(_.asInstanceOf[DeclarativeAggregate])
    +  val bufferAttributes = declFunctions.flatMap(_.aggBufferAttributes)
    +  val bufferSchema = StructType.fromAttributes(bufferAttributes)
    +
    +  // The name for HashMap
    +  var hashMapTerm: String = _
    +
    +  def createHashMap(): UnsafeFixedWidthAggregationMap = {
    +    // create initialized aggregate buffer
    +    val initExpr = declFunctions.flatMap(f => f.initialValues)
    +    val initialBuffer = UnsafeProjection.create(initExpr)(EmptyRow)
    +
    +    // create hashMap
    +    new UnsafeFixedWidthAggregationMap(
    +      initialBuffer,
    +      bufferSchema,
    +      groupingKeySchema,
    +      TaskContext.get().taskMemoryManager(),
    +      1024 * 16, // initial capacity
    +      TaskContext.get().taskMemoryManager().pageSizeBytes,
    +      false // disable tracking of performance metrics
    +    )
    +  }
    +
    +  def createUnsafeJoiner(): UnsafeRowJoiner = {
    +    GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
    +  }
    +
    +  private def doProduceWithKeys(ctx: CodegenContext): String = {
    +    val initAgg = ctx.freshName("initAgg")
    +    ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
    +
    +    // create hashMap
    +    val thisPlan = ctx.addReferenceObj("tungstenAggregate", this)
    +    hashMapTerm = ctx.freshName("hashMap")
    +    val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
    +    ctx.addMutableState(hashMapClassName, hashMapTerm, s"$hashMapTerm = $thisPlan.createHashMap();")
    +
    +    // Create a name for iterator from HashMap
    +    val iterTerm = ctx.freshName("mapIter")
    +    ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
    +
    +    // generate code for output
    +    val keyTerm = ctx.freshName("aggKey")
    +    val bufferTerm = ctx.freshName("aggBuffer")
    +    val outputCode = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // generate output using resultExpressions
    +      ctx.currentVars = null
    +      ctx.INPUT_ROW = keyTerm
    +      val keyVars = groupingExpressions.zipWithIndex.map { case (e, i) =>
    +          BoundReference(i, e.dataType, e.nullable).gen(ctx)
    +      }
    +      ctx.INPUT_ROW = bufferTerm
    +      val bufferVars = bufferAttributes.zipWithIndex.map { case (e, i) =>
    +        BoundReference(i, e.dataType, e.nullable).gen(ctx)
    +      }
    +      // evaluate the aggregation result
    +      ctx.currentVars = bufferVars
    +      val aggResults = declFunctions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttributes).gen(ctx)
    +      }
    +      // generate the final result
    +      ctx.currentVars = keyVars ++ aggResults
    +      val inputAttrs = groupingAttributes ++ aggregateAttributes
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, inputAttrs).gen(ctx)
    +      }
    +      s"""
    +       ${keyVars.map(_.code).mkString("\n")}
    +       ${bufferVars.map(_.code).mkString("\n")}
    +       ${aggResults.map(_.code).mkString("\n")}
    +       ${resultVars.map(_.code).mkString("\n")}
    +
    +       ${consume(ctx, resultVars)}
    +       """
    +
    +    } else if (modes.contains(Partial) |modes.contains(PartialMerge)) {
    +      // This should be the last operator in a stage, we should output UnsafeRow directly
    +      val joinerTerm = ctx.freshName("unsafeRowJoiner")
    +      ctx.addMutableState(classOf[UnsafeRowJoiner].getName, joinerTerm,
    +        s"$joinerTerm = $thisPlan.createUnsafeJoiner();")
    +      val resultRow = ctx.freshName("resultRow")
    +      s"""
    +       UnsafeRow $resultRow = $joinerTerm.join($keyTerm, $bufferTerm);
    +       ${consume(ctx, null, resultRow)}
    +       """
    +
         } else {
    -      functions.flatMap(_.mergeExpressions)
    +      // only grouping key
    +      ctx.INPUT_ROW = keyTerm
    +      ctx.currentVars = null
    +      val eval = resultExpressions.map{ e =>
    +        BindReferences.bindReference(e, groupingAttributes).gen(ctx)
    +      }
    +      s"""
    +       ${eval.map(_.code).mkString("\n")}
    +       ${consume(ctx, eval)}
    +       """
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
    +      s"""
    +        private void $doAgg() throws java.io.IOException {
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +
    +          $iterTerm = $hashMapTerm.iterator();
    +        }
    +       """)
    +
    +    s"""
    +     if (!$initAgg) {
    +       $initAgg = true;
    +       $doAgg();
    +     }
    +
    +     // output the result
    +     while ($iterTerm.next()) {
    +       UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
    --- End diff --
    
    Do we need this row at all? This is the start of a new pipeline right? (Not for this patch)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50918156
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -137,61 +157,297 @@ case class TungstenAggregate(
         bufVars = initExpr.map { e =>
           val isNull = ctx.freshName("bufIsNull")
           val value = ctx.freshName("bufValue")
    +      ctx.addMutableState("boolean", isNull, "")
    +      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
           // The initial expression should not access any column
           val ev = e.gen(ctx)
    -      val initVars = s"""
    -         | boolean $isNull = ${ev.isNull};
    -         | ${ctx.javaType(e.dataType)} $value = ${ev.value};
    -       """.stripMargin
    +      val initVars =
    +        s"""
    +         $isNull = ${ev.isNull};
    +         $value = ${ev.value};
    +       """
           ExprCode(ev.code + initVars, isNull, value)
         }
     
    -    val (rdd, childSource) = child.asInstanceOf[CodegenSupport].produce(ctx, this)
    -    val source =
    +    // generate variables for output
    +    val (resultVars, genResult) = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // evaluate aggregate results
    +      ctx.currentVars = bufVars
    +      val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
    +      val aggResults = functions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttrs).gen(ctx)
    +      }
    +      // evaluate result expressions
    +      ctx.currentVars = aggResults
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, aggregateAttributes).gen(ctx)
    +      }
    +      (resultVars, s"""
    +         ${aggResults.map(_.code).mkString("\n")}
    +         ${resultVars.map(_.code).mkString("\n")}
    +       """)
    +    } else {
    +      // output the aggregate buffer directly
    +      (bufVars, "")
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
           s"""
    -         | if (!$initAgg) {
    -         |   $initAgg = true;
    -         |
    -         |   // initialize aggregation buffer
    -         |   ${bufVars.map(_.code).mkString("\n")}
    -         |
    -         |   $childSource
    -         |
    -         |   // output the result
    -         |   ${consume(ctx, bufVars)}
    -         | }
    -       """.stripMargin
    -
    -    (rdd, source)
    +        private void $doAgg() throws java.io.IOException {
    +          // initialize aggregation buffer
    +          ${bufVars.map(_.code).mkString("\n")}
    +
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +        }
    +       """)
    +
    +    s"""
    +    if (!$initAgg) {
    +      $initAgg = true;
    +      $doAgg();
    +
    +      // output the result
    +      $genResult
    +
    +      ${consume(ctx, resultVars)}
    +    }
    +    """
       }
     
    -  override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
    +  private def doConsumeWithoutKeys(
    +      ctx: CodegenContext,
    +      child: SparkPlan,
    +      input: Seq[ExprCode]): String = {
         // only have DeclarativeAggregate
         val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
    -    // the mode could be only Partial or PartialMerge
    -    val updateExpr = if (modes.contains(Partial)) {
    -      functions.flatMap(_.updateExpressions)
    +    val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output
    +    val updateExpr = aggregateExpressions.flatMap { e =>
    +      e.mode match {
    +        case Partial | Complete =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
    +        case PartialMerge | Final =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
    +      }
    +    }
    +    ctx.currentVars = bufVars ++ input
    +    // TODO: support subexpression elimination
    +    val updates = updateExpr.zipWithIndex.map { case (e, i) =>
    +      val ev = BindReferences.bindReference[Expression](e, inputAttrs).gen(ctx)
    +      s"""
    +       ${ev.code}
    +       ${bufVars(i).isNull} = ${ev.isNull};
    +       ${bufVars(i).value} = ${ev.value};
    +       """
    +    }
    +
    +    s"""
    +     // do aggregate and update aggregation buffer
    +     ${updates.mkString("")}
    +     """
    +  }
    +
    +  val groupingAttributes = groupingExpressions.map(_.toAttribute)
    +  val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    +  val declFunctions = aggregateExpressions.map(_.aggregateFunction)
    +    .filter(_.isInstanceOf[DeclarativeAggregate])
    +    .map(_.asInstanceOf[DeclarativeAggregate])
    +  val bufferAttributes = declFunctions.flatMap(_.aggBufferAttributes)
    +  val bufferSchema = StructType.fromAttributes(bufferAttributes)
    +
    +  // The name for HashMap
    +  var hashMapTerm: String = _
    +
    +  def createHashMap(): UnsafeFixedWidthAggregationMap = {
    +    // create initialized aggregate buffer
    +    val initExpr = declFunctions.flatMap(f => f.initialValues)
    +    val initialBuffer = UnsafeProjection.create(initExpr)(EmptyRow)
    +
    +    // create hashMap
    +    new UnsafeFixedWidthAggregationMap(
    +      initialBuffer,
    +      bufferSchema,
    +      groupingKeySchema,
    +      TaskContext.get().taskMemoryManager(),
    +      1024 * 16, // initial capacity
    +      TaskContext.get().taskMemoryManager().pageSizeBytes,
    +      false // disable tracking of performance metrics
    +    )
    +  }
    +
    +  def createUnsafeJoiner(): UnsafeRowJoiner = {
    --- End diff --
    
    can you consistently use "private def"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174783577
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50058/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173440554
  
    **[Test build #49847 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49847/consoleFull)** for PR 10855 at commit [`a98bc05`](https://github.com/apache/spark/commit/a98bc05bf0542f5fdb8b54cb36651448832cfa8c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176982733
  
    **[Test build #2477 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2477/consoleFull)** for PR 10855 at commit [`d3c2406`](https://github.com/apache/spark/commit/d3c240692bc802d28646e5e8a9c96471fd500952).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176951652
  
    **[Test build #2475 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2475/consoleFull)** for PR 10855 at commit [`d3c2406`](https://github.com/apache/spark/commit/d3c240692bc802d28646e5e8a9c96471fd500952).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176932052
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50397/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173483104
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49862/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176478369
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173440580
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49847/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176993328
  
    **[Test build #50412 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50412/consoleFull)** for PR 10855 at commit [`940c88d`](https://github.com/apache/spark/commit/940c88df74be1f7cc9f62aa19c17dda31266cce4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50952341
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -137,61 +157,297 @@ case class TungstenAggregate(
         bufVars = initExpr.map { e =>
           val isNull = ctx.freshName("bufIsNull")
           val value = ctx.freshName("bufValue")
    +      ctx.addMutableState("boolean", isNull, "")
    +      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
           // The initial expression should not access any column
           val ev = e.gen(ctx)
    -      val initVars = s"""
    -         | boolean $isNull = ${ev.isNull};
    -         | ${ctx.javaType(e.dataType)} $value = ${ev.value};
    -       """.stripMargin
    +      val initVars =
    +        s"""
    +         $isNull = ${ev.isNull};
    +         $value = ${ev.value};
    +       """
           ExprCode(ev.code + initVars, isNull, value)
         }
     
    -    val (rdd, childSource) = child.asInstanceOf[CodegenSupport].produce(ctx, this)
    -    val source =
    +    // generate variables for output
    +    val (resultVars, genResult) = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // evaluate aggregate results
    +      ctx.currentVars = bufVars
    +      val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
    +      val aggResults = functions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttrs).gen(ctx)
    +      }
    +      // evaluate result expressions
    +      ctx.currentVars = aggResults
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, aggregateAttributes).gen(ctx)
    +      }
    +      (resultVars, s"""
    +         ${aggResults.map(_.code).mkString("\n")}
    +         ${resultVars.map(_.code).mkString("\n")}
    +       """)
    +    } else {
    +      // output the aggregate buffer directly
    +      (bufVars, "")
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
           s"""
    -         | if (!$initAgg) {
    -         |   $initAgg = true;
    -         |
    -         |   // initialize aggregation buffer
    -         |   ${bufVars.map(_.code).mkString("\n")}
    -         |
    -         |   $childSource
    -         |
    -         |   // output the result
    -         |   ${consume(ctx, bufVars)}
    -         | }
    -       """.stripMargin
    -
    -    (rdd, source)
    +        private void $doAgg() throws java.io.IOException {
    +          // initialize aggregation buffer
    +          ${bufVars.map(_.code).mkString("\n")}
    +
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +        }
    +       """)
    +
    +    s"""
    +    if (!$initAgg) {
    +      $initAgg = true;
    +      $doAgg();
    +
    +      // output the result
    +      $genResult
    +
    +      ${consume(ctx, resultVars)}
    +    }
    +    """
       }
     
    -  override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
    +  private def doConsumeWithoutKeys(
    +      ctx: CodegenContext,
    +      child: SparkPlan,
    +      input: Seq[ExprCode]): String = {
         // only have DeclarativeAggregate
         val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
    -    // the mode could be only Partial or PartialMerge
    -    val updateExpr = if (modes.contains(Partial)) {
    -      functions.flatMap(_.updateExpressions)
    +    val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output
    +    val updateExpr = aggregateExpressions.flatMap { e =>
    +      e.mode match {
    +        case Partial | Complete =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
    +        case PartialMerge | Final =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
    +      }
    +    }
    +    ctx.currentVars = bufVars ++ input
    +    // TODO: support subexpression elimination
    +    val updates = updateExpr.zipWithIndex.map { case (e, i) =>
    +      val ev = BindReferences.bindReference[Expression](e, inputAttrs).gen(ctx)
    +      s"""
    +       ${ev.code}
    +       ${bufVars(i).isNull} = ${ev.isNull};
    +       ${bufVars(i).value} = ${ev.value};
    +       """
    +    }
    +
    +    s"""
    +     // do aggregate and update aggregation buffer
    +     ${updates.mkString("")}
    +     """
    +  }
    +
    +  val groupingAttributes = groupingExpressions.map(_.toAttribute)
    +  val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    +  val declFunctions = aggregateExpressions.map(_.aggregateFunction)
    +    .filter(_.isInstanceOf[DeclarativeAggregate])
    +    .map(_.asInstanceOf[DeclarativeAggregate])
    +  val bufferAttributes = declFunctions.flatMap(_.aggBufferAttributes)
    +  val bufferSchema = StructType.fromAttributes(bufferAttributes)
    +
    +  // The name for HashMap
    +  var hashMapTerm: String = _
    +
    +  def createHashMap(): UnsafeFixedWidthAggregationMap = {
    +    // create initialized aggregate buffer
    +    val initExpr = declFunctions.flatMap(f => f.initialValues)
    +    val initialBuffer = UnsafeProjection.create(initExpr)(EmptyRow)
    +
    +    // create hashMap
    +    new UnsafeFixedWidthAggregationMap(
    +      initialBuffer,
    +      bufferSchema,
    +      groupingKeySchema,
    +      TaskContext.get().taskMemoryManager(),
    +      1024 * 16, // initial capacity
    +      TaskContext.get().taskMemoryManager().pageSizeBytes,
    +      false // disable tracking of performance metrics
    +    )
    +  }
    +
    +  def createUnsafeJoiner(): UnsafeRowJoiner = {
    --- End diff --
    
    These methods are called in generated class, so should be public.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50915674
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala ---
    @@ -42,9 +43,14 @@ trait CodegenSupport extends SparkPlan {
       private var parent: CodegenSupport = null
     
       /**
    +    * Returns the RDD of InternalRow which generates the input rows.
    --- End diff --
    
    Can you comment why this is useful?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176963141
  
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean TungstenAggregate_initAgg0;
    /* 010 */   private org.apache.spark.sql.execution.aggregate.TungstenAggregate TungstenAggregate_plan1;
    /* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap TungstenAggregate_hashMap2;
    /* 012 */   private org.apache.spark.unsafe.KVIterator TungstenAggregate_mapIter3;
    /* 013 */   private UnsafeRow TungstenAggregate_result10;
    /* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder TungstenAggregate_holder11;
    /* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter TungstenAggregate_rowWriter12;
    /* 016 */   private boolean Range_initRange15;
    /* 017 */   private long Range_partitionEnd16;
    /* 018 */   private long Range_number17;
    /* 019 */   private boolean Range_overflow18;
    /* 020 */   private UnsafeRow TungstenAggregate_result28;
    /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder TungstenAggregate_holder29;
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter TungstenAggregate_rowWriter30;
    /* 023 */
    /* 024 */   private void TungstenAggregate_doAggregateWithKeys14() throws java.io.IOException {
    /* 025 */
    /* 026 */     // initialize Range
    /* 027 */     if (!Range_initRange15) {
    /* 028 */       Range_initRange15 = true;
    /* 029 */       if (input.hasNext()) {
    /* 030 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 031 */       } else {
    /* 032 */         return;
    /* 033 */       }
    /* 034 */     }
    /* 035 */
    /* 036 */     while (!Range_overflow18 && Range_number17 < Range_partitionEnd16) {
    /* 037 */       long Range_value19 = Range_number17;
    /* 038 */       Range_number17 += 1L;
    /* 039 */       if (Range_number17 < Range_value19 ^ 1L < 0) {
    /* 040 */         Range_overflow18 = true;
    /* 041 */       }
    /* 042 */
    /* 043 */       /* (input[0, bigint] & 65535) */
    /* 044 */       /* input[0, bigint] */
    /* 045 */
    /* 046 */       /* 65535 */
    /* 047 */
    /* 048 */       long Project_value21 = -1L;
    /* 049 */       Project_value21 = Range_value19 & 65535L;
    /* 050 */
    /* 051 */
    /* 052 */       // generate grouping key
    /* 053 */
    /* 054 */
    /* 055 */
    /* 056 */       /* input[0, bigint] */
    /* 057 */
    /* 058 */       TungstenAggregate_rowWriter30.write(0, Project_value21);
    /* 059 */
    /* 060 */
    /* 061 */       UnsafeRow TungstenAggregate_aggBuffer32 = TungstenAggregate_hashMap2.getAggregationBufferFromUnsafeRow(TungstenAggregate_result28);
    /* 062 */       if (TungstenAggregate_aggBuffer32 == null) {
    /* 063 */         // failed to allocate the first page
    /* 064 */         throw new OutOfMemoryError("No enough memory for aggregation");
    /* 065 */       }
    /* 066 */
    /* 067 */       // evaluate aggregate function
    /* 068 */
    /* 069 */       // update aggregate buffer
    /* 070 */
    /* 071 */
    /* 072 */
    /* 073 */     }
    /* 074 */
    /* 075 */
    /* 076 */     TungstenAggregate_mapIter3 = TungstenAggregate_hashMap2.iterator();
    /* 077 */   }
    /* 078 */
    /* 079 */
    /* 080 */   private void initRange(int idx) {
    /* 081 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
    /* 082 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
    /* 083 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(20971520L);
    /* 084 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
    /* 085 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
    /* 086 */
    /* 087 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
    /* 088 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 089 */       Range_number17 = Long.MAX_VALUE;
    /* 090 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 091 */       Range_number17 = Long.MIN_VALUE;
    /* 092 */     } else {
    /* 093 */       Range_number17 = st.longValue();
    /* 094 */     }
    /* 095 */
    /* 096 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
    /* 097 */     .multiply(step).add(start);
    /* 098 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 099 */       Range_partitionEnd16 = Long.MAX_VALUE;
    /* 100 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 101 */       Range_partitionEnd16 = Long.MIN_VALUE;
    /* 102 */     } else {
    /* 103 */       Range_partitionEnd16 = end.longValue();
    /* 104 */     }
    /* 105 */   }
    /* 106 */
    /* 107 */
    /* 108 */   public GeneratedIterator(Object[] references) {
    /* 109 */     this.references = references;
    /* 110 */     TungstenAggregate_initAgg0 = false;
    /* 111 */     this.TungstenAggregate_plan1 = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
    /* 112 */     TungstenAggregate_hashMap2 = TungstenAggregate_plan1.createHashMap();
    /* 113 */
    /* 114 */     TungstenAggregate_result10 = new UnsafeRow(1);
    /* 115 */     this.TungstenAggregate_holder11 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(TungstenAggregate_result10, 0);
    /* 116 */     this.TungstenAggregate_rowWriter12 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(TungstenAggregate_holder11, 1);
    /* 117 */     Range_initRange15 = false;
    /* 118 */     Range_partitionEnd16 = 0L;
    /* 119 */     Range_number17 = 0L;
    /* 120 */     Range_overflow18 = false;
    /* 121 */     TungstenAggregate_result28 = new UnsafeRow(1);
    /* 122 */     this.TungstenAggregate_holder29 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(TungstenAggregate_result28, 0);
    /* 123 */     this.TungstenAggregate_rowWriter30 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(TungstenAggregate_holder29, 1);
    /* 124 */   }
    /* 125 */
    /* 126 */   protected void processNext() throws java.io.IOException {
    /* 127 */
    /* 128 */     if (!TungstenAggregate_initAgg0) {
    /* 129 */       TungstenAggregate_initAgg0 = true;
    /* 130 */       TungstenAggregate_doAggregateWithKeys14();
    /* 131 */     }
    /* 132 */
    /* 133 */     // output the result
    /* 134 */     while (TungstenAggregate_mapIter3.next()) {
    /* 135 */       UnsafeRow TungstenAggregate_aggKey4 = (UnsafeRow) TungstenAggregate_mapIter3.getKey();
    /* 136 */       UnsafeRow TungstenAggregate_aggBuffer5 = (UnsafeRow) TungstenAggregate_mapIter3.getValue();
    /* 137 */
    /* 138 */       /* input[0, bigint] */
    /* 139 */       long TungstenAggregate_value7 = TungstenAggregate_aggKey4.getLong(0);
    /* 140 */
    /* 141 */       /* input[0, bigint] */
    /* 142 */
    /* 143 */       TungstenAggregate_rowWriter12.write(0, TungstenAggregate_value7);
    /* 144 */       currentRow = TungstenAggregate_result10;
    /* 145 */       return;
    /* 146 */
    /* 147 */
    /* 148 */     }
    /* 149 */
    /* 150 */     TungstenAggregate_hashMap2.free();
    /* 151 */
    /* 152 */   }
    /* 153 */ }
    /* 154 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50947591
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -34,7 +36,7 @@
       // used when there is no column in output
       protected UnsafeRow unsafeRow = new UnsafeRow(0);
     
    -  public boolean hasNext() {
    +  public boolean hasNext() throws IOException {
    --- End diff --
    
    The KVIterator will throws IOException, we need to propagate it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177031189
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50420/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173461722
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176485958
  
    **[Test build #50312 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50312/consoleFull)** for PR 10855 at commit [`3bfdeb2`](https://github.com/apache/spark/commit/3bfdeb2cacebe8567f2d0123c853a20de84fa158).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173468280
  
    **[Test build #49862 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49862/consoleFull)** for PR 10855 at commit [`7880786`](https://github.com/apache/spark/commit/788078668795458aa29a55d18e2b23686992df8d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10855


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176993427
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50412/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177062354
  
    Merging this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177051694
  
    **[Test build #2483 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2483/consoleFull)** for PR 10855 at commit [`caad24f`](https://github.com/apache/spark/commit/caad24f414fde59e8563d05eba76fa5cf448ddf6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173483103
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176498544
  
    **[Test build #50312 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50312/consoleFull)** for PR 10855 at commit [`3bfdeb2`](https://github.com/apache/spark/commit/3bfdeb2cacebe8567f2d0123c853a20de84fa158).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176961640
  
    **[Test build #2477 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2477/consoleFull)** for PR 10855 at commit [`d3c2406`](https://github.com/apache/spark/commit/d3c240692bc802d28646e5e8a9c96471fd500952).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175457611
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50177/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176986209
  
    **[Test build #50412 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50412/consoleFull)** for PR 10855 at commit [`940c88d`](https://github.com/apache/spark/commit/940c88df74be1f7cc9f62aa19c17dda31266cce4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176932048
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176926508
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50389/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174782565
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50060/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176478326
  
    **[Test build #50307 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50307/consoleFull)** for PR 10855 at commit [`efe7fa2`](https://github.com/apache/spark/commit/efe7fa26d0ed49c09ec886185713f235c645570f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177031187
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176925677
  
    **[Test build #50389 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50389/consoleFull)** for PR 10855 at commit [`858c1e3`](https://github.com/apache/spark/commit/858c1e3b0ce20dad0ee23d2444267f5a640fac2a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class LinearRegressionModel(JavaModel, MLWritable, MLReadable):`
      * `class JavaMLWriter(object):`
      * `class MLWritable(object):`
      * `class JavaMLReader(object):`
      * `        java_class = cls._java_loader_class(clazz)`
      * `class MLReadable(object):`
      * `case class SetDatabaseCommand(databaseName: String) extends RunnableCommand `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173413833
  
    **[Test build #49829 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49829/consoleFull)** for PR 10855 at commit [`3e792f3`](https://github.com/apache/spark/commit/3e792f3569d7a397e2817ac3b66816a3c35feed0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173417159
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177027561
  
    **[Test build #50420 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50420/consoleFull)** for PR 10855 at commit [`caad24f`](https://github.com/apache/spark/commit/caad24f414fde59e8563d05eba76fa5cf448ddf6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176974831
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177035314
  
    **[Test build #2483 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2483/consoleFull)** for PR 10855 at commit [`caad24f`](https://github.com/apache/spark/commit/caad24f414fde59e8563d05eba76fa5cf448ddf6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177025384
  
    **[Test build #2482 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2482/consoleFull)** for PR 10855 at commit [`940c88d`](https://github.com/apache/spark/commit/940c88df74be1f7cc9f62aa19c17dda31266cce4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177010443
  
    **[Test build #2482 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2482/consoleFull)** for PR 10855 at commit [`940c88d`](https://github.com/apache/spark/commit/940c88df74be1f7cc9f62aa19c17dda31266cce4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173461724
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49855/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175459406
  
    **[Test build #50178 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50178/consoleFull)** for PR 10855 at commit [`48e125c`](https://github.com/apache/spark/commit/48e125cd7623af1af2b9f82b9ca8ddeca438ad17).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173417160
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49829/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174791573
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174791518
  
    **[Test build #50063 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50063/consoleFull)** for PR 10855 at commit [`9a42b52`](https://github.com/apache/spark/commit/9a42b522cb483a3502ee36cea6672c36f2e40b46).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `      class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176926490
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176951729
  
    @davies Can you include the generated output?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50915162
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala ---
    @@ -42,9 +43,14 @@ trait CodegenSupport extends SparkPlan {
       private var parent: CodegenSupport = null
     
       /**
    +    * Returns the RDD of InternalRow which generates the input rows.
    +    */
    +  def upstream(): RDD[InternalRow]
    +
    +  /**
         * Returns an input RDD of InternalRow and Java source code to process them.
    --- End diff --
    
    Update comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r51310284
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -114,22 +116,38 @@ case class TungstenAggregate(
         }
       }
     
    +  // all the mode of aggregate expressions
    +  private val modes = aggregateExpressions.map(_.mode).distinct
    +
       override def supportCodegen: Boolean = {
    -    groupingExpressions.isEmpty &&
    -      // ImperativeAggregate is not supported right now
    -      !aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate])
    +    // ImperativeAggregate is not supported right now
    +    !aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate])
       }
     
    -  // The variables used as aggregation buffer
    -  private var bufVars: Seq[ExprCode] = _
    -
    -  private val modes = aggregateExpressions.map(_.mode).distinct
    -
       override def upstream(): RDD[InternalRow] = {
         child.asInstanceOf[CodegenSupport].upstream()
       }
     
       protected override def doProduce(ctx: CodegenContext): String = {
    +    if (groupingExpressions.isEmpty) {
    +      doProduceWithoutKeys(ctx)
    +    } else {
    +      doProduceWithKeys(ctx)
    +    }
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (groupingExpressions.isEmpty) {
    +      doConsumeWithoutKeys(ctx, input)
    +    } else {
    +      doConsumeWithKeys(ctx, input)
    +    }
    +  }
    +
    +  // The variables used as aggregation buffer
    +  private var bufVars: Seq[ExprCode] = _
    --- End diff --
    
    I find it weird you've split this up this way. Can you comment that this is only used when its grouping with no agg?
    
    Why do you manage this in side the TungstenAggregation class but create the hashmap so differently? They seem logically serving the same purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50952319
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -137,61 +157,297 @@ case class TungstenAggregate(
         bufVars = initExpr.map { e =>
           val isNull = ctx.freshName("bufIsNull")
           val value = ctx.freshName("bufValue")
    +      ctx.addMutableState("boolean", isNull, "")
    +      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
           // The initial expression should not access any column
           val ev = e.gen(ctx)
    -      val initVars = s"""
    -         | boolean $isNull = ${ev.isNull};
    -         | ${ctx.javaType(e.dataType)} $value = ${ev.value};
    -       """.stripMargin
    +      val initVars =
    +        s"""
    +         $isNull = ${ev.isNull};
    +         $value = ${ev.value};
    +       """
           ExprCode(ev.code + initVars, isNull, value)
         }
     
    -    val (rdd, childSource) = child.asInstanceOf[CodegenSupport].produce(ctx, this)
    -    val source =
    +    // generate variables for output
    +    val (resultVars, genResult) = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // evaluate aggregate results
    +      ctx.currentVars = bufVars
    +      val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
    +      val aggResults = functions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttrs).gen(ctx)
    +      }
    +      // evaluate result expressions
    +      ctx.currentVars = aggResults
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, aggregateAttributes).gen(ctx)
    +      }
    +      (resultVars, s"""
    +         ${aggResults.map(_.code).mkString("\n")}
    +         ${resultVars.map(_.code).mkString("\n")}
    +       """)
    +    } else {
    +      // output the aggregate buffer directly
    +      (bufVars, "")
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
           s"""
    -         | if (!$initAgg) {
    -         |   $initAgg = true;
    -         |
    -         |   // initialize aggregation buffer
    -         |   ${bufVars.map(_.code).mkString("\n")}
    -         |
    -         |   $childSource
    -         |
    -         |   // output the result
    -         |   ${consume(ctx, bufVars)}
    -         | }
    -       """.stripMargin
    -
    -    (rdd, source)
    +        private void $doAgg() throws java.io.IOException {
    +          // initialize aggregation buffer
    +          ${bufVars.map(_.code).mkString("\n")}
    +
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +        }
    +       """)
    +
    +    s"""
    +    if (!$initAgg) {
    +      $initAgg = true;
    +      $doAgg();
    +
    +      // output the result
    +      $genResult
    +
    +      ${consume(ctx, resultVars)}
    +    }
    +    """
       }
     
    -  override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
    +  private def doConsumeWithoutKeys(
    +      ctx: CodegenContext,
    +      child: SparkPlan,
    +      input: Seq[ExprCode]): String = {
         // only have DeclarativeAggregate
         val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
    -    // the mode could be only Partial or PartialMerge
    -    val updateExpr = if (modes.contains(Partial)) {
    -      functions.flatMap(_.updateExpressions)
    +    val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output
    +    val updateExpr = aggregateExpressions.flatMap { e =>
    +      e.mode match {
    +        case Partial | Complete =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
    +        case PartialMerge | Final =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
    +      }
    +    }
    +    ctx.currentVars = bufVars ++ input
    +    // TODO: support subexpression elimination
    +    val updates = updateExpr.zipWithIndex.map { case (e, i) =>
    +      val ev = BindReferences.bindReference[Expression](e, inputAttrs).gen(ctx)
    +      s"""
    +       ${ev.code}
    +       ${bufVars(i).isNull} = ${ev.isNull};
    +       ${bufVars(i).value} = ${ev.value};
    +       """
    +    }
    +
    +    s"""
    +     // do aggregate and update aggregation buffer
    +     ${updates.mkString("")}
    +     """
    +  }
    +
    +  val groupingAttributes = groupingExpressions.map(_.toAttribute)
    +  val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    +  val declFunctions = aggregateExpressions.map(_.aggregateFunction)
    +    .filter(_.isInstanceOf[DeclarativeAggregate])
    +    .map(_.asInstanceOf[DeclarativeAggregate])
    +  val bufferAttributes = declFunctions.flatMap(_.aggBufferAttributes)
    +  val bufferSchema = StructType.fromAttributes(bufferAttributes)
    +
    +  // The name for HashMap
    +  var hashMapTerm: String = _
    +
    +  def createHashMap(): UnsafeFixedWidthAggregationMap = {
    +    // create initialized aggregate buffer
    +    val initExpr = declFunctions.flatMap(f => f.initialValues)
    +    val initialBuffer = UnsafeProjection.create(initExpr)(EmptyRow)
    +
    +    // create hashMap
    +    new UnsafeFixedWidthAggregationMap(
    +      initialBuffer,
    +      bufferSchema,
    +      groupingKeySchema,
    +      TaskContext.get().taskMemoryManager(),
    +      1024 * 16, // initial capacity
    +      TaskContext.get().taskMemoryManager().pageSizeBytes,
    +      false // disable tracking of performance metrics
    +    )
    +  }
    +
    +  def createUnsafeJoiner(): UnsafeRowJoiner = {
    +    GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
    +  }
    +
    +  private def doProduceWithKeys(ctx: CodegenContext): String = {
    +    val initAgg = ctx.freshName("initAgg")
    +    ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
    +
    +    // create hashMap
    +    val thisPlan = ctx.addReferenceObj("tungstenAggregate", this)
    +    hashMapTerm = ctx.freshName("hashMap")
    +    val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
    +    ctx.addMutableState(hashMapClassName, hashMapTerm, s"$hashMapTerm = $thisPlan.createHashMap();")
    +
    +    // Create a name for iterator from HashMap
    +    val iterTerm = ctx.freshName("mapIter")
    +    ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
    +
    +    // generate code for output
    +    val keyTerm = ctx.freshName("aggKey")
    +    val bufferTerm = ctx.freshName("aggBuffer")
    +    val outputCode = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // generate output using resultExpressions
    +      ctx.currentVars = null
    +      ctx.INPUT_ROW = keyTerm
    +      val keyVars = groupingExpressions.zipWithIndex.map { case (e, i) =>
    +          BoundReference(i, e.dataType, e.nullable).gen(ctx)
    +      }
    +      ctx.INPUT_ROW = bufferTerm
    +      val bufferVars = bufferAttributes.zipWithIndex.map { case (e, i) =>
    +        BoundReference(i, e.dataType, e.nullable).gen(ctx)
    +      }
    +      // evaluate the aggregation result
    +      ctx.currentVars = bufferVars
    +      val aggResults = declFunctions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttributes).gen(ctx)
    +      }
    +      // generate the final result
    +      ctx.currentVars = keyVars ++ aggResults
    +      val inputAttrs = groupingAttributes ++ aggregateAttributes
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, inputAttrs).gen(ctx)
    +      }
    +      s"""
    +       ${keyVars.map(_.code).mkString("\n")}
    +       ${bufferVars.map(_.code).mkString("\n")}
    +       ${aggResults.map(_.code).mkString("\n")}
    +       ${resultVars.map(_.code).mkString("\n")}
    +
    +       ${consume(ctx, resultVars)}
    +       """
    +
    +    } else if (modes.contains(Partial) |modes.contains(PartialMerge)) {
    +      // This should be the last operator in a stage, we should output UnsafeRow directly
    +      val joinerTerm = ctx.freshName("unsafeRowJoiner")
    +      ctx.addMutableState(classOf[UnsafeRowJoiner].getName, joinerTerm,
    +        s"$joinerTerm = $thisPlan.createUnsafeJoiner();")
    +      val resultRow = ctx.freshName("resultRow")
    +      s"""
    +       UnsafeRow $resultRow = $joinerTerm.join($keyTerm, $bufferTerm);
    +       ${consume(ctx, null, resultRow)}
    +       """
    +
         } else {
    -      functions.flatMap(_.mergeExpressions)
    +      // only grouping key
    +      ctx.INPUT_ROW = keyTerm
    +      ctx.currentVars = null
    +      val eval = resultExpressions.map{ e =>
    +        BindReferences.bindReference(e, groupingAttributes).gen(ctx)
    +      }
    +      s"""
    +       ${eval.map(_.code).mkString("\n")}
    +       ${consume(ctx, eval)}
    +       """
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
    +      s"""
    +        private void $doAgg() throws java.io.IOException {
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +
    +          $iterTerm = $hashMapTerm.iterator();
    +        }
    +       """)
    +
    +    s"""
    +     if (!$initAgg) {
    +       $initAgg = true;
    +       $doAgg();
    +     }
    +
    +     // output the result
    +     while ($iterTerm.next()) {
    +       UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
    --- End diff --
    
    We need this to generate output


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174783576
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173458567
  
    **[Test build #49855 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49855/consoleFull)** for PR 10855 at commit [`7d1bd43`](https://github.com/apache/spark/commit/7d1bd43aafd7c38120b9508830e7a22db11371b4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176971258
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50400/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176940271
  
    **[Test build #2475 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2475/consoleFull)** for PR 10855 at commit [`d3c2406`](https://github.com/apache/spark/commit/d3c240692bc802d28646e5e8a9c96471fd500952).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175457609
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174784317
  
    **[Test build #50063 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50063/consoleFull)** for PR 10855 at commit [`9a42b52`](https://github.com/apache/spark/commit/9a42b522cb483a3502ee36cea6672c36f2e40b46).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176970907
  
    **[Test build #50400 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50400/consoleFull)** for PR 10855 at commit [`d3c2406`](https://github.com/apache/spark/commit/d3c240692bc802d28646e5e8a9c96471fd500952).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-174782556
  
    **[Test build #50060 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50060/consoleFull)** for PR 10855 at commit [`9cc7925`](https://github.com/apache/spark/commit/9cc79254c38d97fe136d181518356c650114536f).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `      class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175479794
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50178/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10855#discussion_r50919705
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -137,61 +157,297 @@ case class TungstenAggregate(
         bufVars = initExpr.map { e =>
           val isNull = ctx.freshName("bufIsNull")
           val value = ctx.freshName("bufValue")
    +      ctx.addMutableState("boolean", isNull, "")
    +      ctx.addMutableState(ctx.javaType(e.dataType), value, "")
           // The initial expression should not access any column
           val ev = e.gen(ctx)
    -      val initVars = s"""
    -         | boolean $isNull = ${ev.isNull};
    -         | ${ctx.javaType(e.dataType)} $value = ${ev.value};
    -       """.stripMargin
    +      val initVars =
    +        s"""
    +         $isNull = ${ev.isNull};
    +         $value = ${ev.value};
    +       """
           ExprCode(ev.code + initVars, isNull, value)
         }
     
    -    val (rdd, childSource) = child.asInstanceOf[CodegenSupport].produce(ctx, this)
    -    val source =
    +    // generate variables for output
    +    val (resultVars, genResult) = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // evaluate aggregate results
    +      ctx.currentVars = bufVars
    +      val bufferAttrs = functions.flatMap(_.aggBufferAttributes)
    +      val aggResults = functions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttrs).gen(ctx)
    +      }
    +      // evaluate result expressions
    +      ctx.currentVars = aggResults
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, aggregateAttributes).gen(ctx)
    +      }
    +      (resultVars, s"""
    +         ${aggResults.map(_.code).mkString("\n")}
    +         ${resultVars.map(_.code).mkString("\n")}
    +       """)
    +    } else {
    +      // output the aggregate buffer directly
    +      (bufVars, "")
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
           s"""
    -         | if (!$initAgg) {
    -         |   $initAgg = true;
    -         |
    -         |   // initialize aggregation buffer
    -         |   ${bufVars.map(_.code).mkString("\n")}
    -         |
    -         |   $childSource
    -         |
    -         |   // output the result
    -         |   ${consume(ctx, bufVars)}
    -         | }
    -       """.stripMargin
    -
    -    (rdd, source)
    +        private void $doAgg() throws java.io.IOException {
    +          // initialize aggregation buffer
    +          ${bufVars.map(_.code).mkString("\n")}
    +
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +        }
    +       """)
    +
    +    s"""
    +    if (!$initAgg) {
    +      $initAgg = true;
    +      $doAgg();
    +
    +      // output the result
    +      $genResult
    +
    +      ${consume(ctx, resultVars)}
    +    }
    +    """
       }
     
    -  override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
    +  private def doConsumeWithoutKeys(
    +      ctx: CodegenContext,
    +      child: SparkPlan,
    +      input: Seq[ExprCode]): String = {
         // only have DeclarativeAggregate
         val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
    -    // the mode could be only Partial or PartialMerge
    -    val updateExpr = if (modes.contains(Partial)) {
    -      functions.flatMap(_.updateExpressions)
    +    val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output
    +    val updateExpr = aggregateExpressions.flatMap { e =>
    +      e.mode match {
    +        case Partial | Complete =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
    +        case PartialMerge | Final =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
    +      }
    +    }
    +    ctx.currentVars = bufVars ++ input
    +    // TODO: support subexpression elimination
    +    val updates = updateExpr.zipWithIndex.map { case (e, i) =>
    +      val ev = BindReferences.bindReference[Expression](e, inputAttrs).gen(ctx)
    +      s"""
    +       ${ev.code}
    +       ${bufVars(i).isNull} = ${ev.isNull};
    +       ${bufVars(i).value} = ${ev.value};
    +       """
    +    }
    +
    +    s"""
    +     // do aggregate and update aggregation buffer
    +     ${updates.mkString("")}
    +     """
    +  }
    +
    +  val groupingAttributes = groupingExpressions.map(_.toAttribute)
    +  val groupingKeySchema = StructType.fromAttributes(groupingAttributes)
    +  val declFunctions = aggregateExpressions.map(_.aggregateFunction)
    +    .filter(_.isInstanceOf[DeclarativeAggregate])
    +    .map(_.asInstanceOf[DeclarativeAggregate])
    +  val bufferAttributes = declFunctions.flatMap(_.aggBufferAttributes)
    +  val bufferSchema = StructType.fromAttributes(bufferAttributes)
    +
    +  // The name for HashMap
    +  var hashMapTerm: String = _
    +
    +  def createHashMap(): UnsafeFixedWidthAggregationMap = {
    +    // create initialized aggregate buffer
    +    val initExpr = declFunctions.flatMap(f => f.initialValues)
    +    val initialBuffer = UnsafeProjection.create(initExpr)(EmptyRow)
    +
    +    // create hashMap
    +    new UnsafeFixedWidthAggregationMap(
    +      initialBuffer,
    +      bufferSchema,
    +      groupingKeySchema,
    +      TaskContext.get().taskMemoryManager(),
    +      1024 * 16, // initial capacity
    +      TaskContext.get().taskMemoryManager().pageSizeBytes,
    +      false // disable tracking of performance metrics
    +    )
    +  }
    +
    +  def createUnsafeJoiner(): UnsafeRowJoiner = {
    +    GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
    +  }
    +
    +  private def doProduceWithKeys(ctx: CodegenContext): String = {
    +    val initAgg = ctx.freshName("initAgg")
    +    ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
    +
    +    // create hashMap
    +    val thisPlan = ctx.addReferenceObj("tungstenAggregate", this)
    +    hashMapTerm = ctx.freshName("hashMap")
    +    val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
    +    ctx.addMutableState(hashMapClassName, hashMapTerm, s"$hashMapTerm = $thisPlan.createHashMap();")
    +
    +    // Create a name for iterator from HashMap
    +    val iterTerm = ctx.freshName("mapIter")
    +    ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
    +
    +    // generate code for output
    +    val keyTerm = ctx.freshName("aggKey")
    +    val bufferTerm = ctx.freshName("aggBuffer")
    +    val outputCode = if (modes.contains(Final) |modes.contains(Complete)) {
    +      // generate output using resultExpressions
    +      ctx.currentVars = null
    +      ctx.INPUT_ROW = keyTerm
    +      val keyVars = groupingExpressions.zipWithIndex.map { case (e, i) =>
    +          BoundReference(i, e.dataType, e.nullable).gen(ctx)
    +      }
    +      ctx.INPUT_ROW = bufferTerm
    +      val bufferVars = bufferAttributes.zipWithIndex.map { case (e, i) =>
    +        BoundReference(i, e.dataType, e.nullable).gen(ctx)
    +      }
    +      // evaluate the aggregation result
    +      ctx.currentVars = bufferVars
    +      val aggResults = declFunctions.map(_.evaluateExpression).map { e =>
    +        BindReferences.bindReference(e, bufferAttributes).gen(ctx)
    +      }
    +      // generate the final result
    +      ctx.currentVars = keyVars ++ aggResults
    +      val inputAttrs = groupingAttributes ++ aggregateAttributes
    +      val resultVars = resultExpressions.map { e =>
    +        BindReferences.bindReference(e, inputAttrs).gen(ctx)
    +      }
    +      s"""
    +       ${keyVars.map(_.code).mkString("\n")}
    +       ${bufferVars.map(_.code).mkString("\n")}
    +       ${aggResults.map(_.code).mkString("\n")}
    +       ${resultVars.map(_.code).mkString("\n")}
    +
    +       ${consume(ctx, resultVars)}
    +       """
    +
    +    } else if (modes.contains(Partial) |modes.contains(PartialMerge)) {
    +      // This should be the last operator in a stage, we should output UnsafeRow directly
    +      val joinerTerm = ctx.freshName("unsafeRowJoiner")
    +      ctx.addMutableState(classOf[UnsafeRowJoiner].getName, joinerTerm,
    +        s"$joinerTerm = $thisPlan.createUnsafeJoiner();")
    +      val resultRow = ctx.freshName("resultRow")
    +      s"""
    +       UnsafeRow $resultRow = $joinerTerm.join($keyTerm, $bufferTerm);
    +       ${consume(ctx, null, resultRow)}
    +       """
    +
         } else {
    -      functions.flatMap(_.mergeExpressions)
    +      // only grouping key
    +      ctx.INPUT_ROW = keyTerm
    +      ctx.currentVars = null
    +      val eval = resultExpressions.map{ e =>
    +        BindReferences.bindReference(e, groupingAttributes).gen(ctx)
    +      }
    +      s"""
    +       ${eval.map(_.code).mkString("\n")}
    +       ${consume(ctx, eval)}
    +       """
    +    }
    +
    +    val doAgg = ctx.freshName("doAgg")
    +    ctx.addNewFunction(doAgg,
    +      s"""
    +        private void $doAgg() throws java.io.IOException {
    +          ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +
    +          $iterTerm = $hashMapTerm.iterator();
    +        }
    +       """)
    +
    +    s"""
    +     if (!$initAgg) {
    +       $initAgg = true;
    +       $doAgg();
    +     }
    +
    +     // output the result
    +     while ($iterTerm.next()) {
    +       UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
    +       UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
    +       $outputCode
    +     }
    +
    +     $hashMapTerm.free();
    +     """
    +  }
    +
    +  private def doConsumeWithKeys(
    +      ctx: CodegenContext,
    +      child: SparkPlan,
    +      input: Seq[ExprCode]): String = {
    +
    +    // create grouping key
    +    ctx.currentVars = input
    +    val keyCode = GenerateUnsafeProjection.createCode(
    +      ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output)))
    +    val key = keyCode.value
    +    val buffer = ctx.freshName("aggBuffer")
    +
    +    // only have DeclarativeAggregate
    +    val updateExpr = aggregateExpressions.flatMap { e =>
    +      e.mode match {
    +        case Partial | Complete =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
    +        case PartialMerge | Final =>
    +          e.aggregateFunction.asInstanceOf[DeclarativeAggregate].mergeExpressions
    +      }
         }
     
    -    val inputAttr = functions.flatMap(_.aggBufferAttributes) ++ child.output
    +    val inputAttr = bufferAttributes ++ child.output
         val boundExpr = updateExpr.map(e => BindReferences.bindReference(e, inputAttr))
    -    ctx.currentVars = bufVars ++ input
    +    ctx.currentVars = new Array[ExprCode](bufferAttributes.length) ++ input
    +    ctx.INPUT_ROW = buffer
         // TODO: support subexpression elimination
    -    val codes = boundExpr.zipWithIndex.map { case (e, i) =>
    -      val ev = e.gen(ctx)
    -      s"""
    -         | ${ev.code}
    -         | ${bufVars(i).isNull} = ${ev.isNull};
    -         | ${bufVars(i).value} = ${ev.value};
    -       """.stripMargin
    +    val evals = boundExpr.map(_.gen(ctx))
    +    val updates = evals.zipWithIndex.map { case (ev, i) =>
    +      val dt = updateExpr(i).dataType
    +      if (updateExpr(i).nullable) {
    +        if (dt.isInstanceOf[DecimalType]) {
    --- End diff --
    
    Can we move this to ctx.
    
    setColumn(buffer, dt, i, ev) that handles all this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176896266
  
    **[Test build #50389 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50389/consoleFull)** for PR 10855 at commit [`858c1e3`](https://github.com/apache/spark/commit/858c1e3b0ce20dad0ee23d2444267f5a640fac2a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176498689
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-177013809
  
    **[Test build #2479 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2479/consoleFull)** for PR 10855 at commit [`940c88d`](https://github.com/apache/spark/commit/940c88df74be1f7cc9f62aa19c17dda31266cce4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176987979
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-175479792
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-176993422
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12914] [SQL] generate aggregation with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10855#issuecomment-173417117
  
    **[Test build #49829 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49829/consoleFull)** for PR 10855 at commit [`3e792f3`](https://github.com/apache/spark/commit/3e792f3569d7a397e2817ac3b66816a3c35feed0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `        class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org