You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2017/12/03 16:12:27 UTC

[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/19869

    [SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate

    ## What changes were proposed in this pull request?
    
    The `HashAggregateExec` whole stage codegen path is a little messy and hard to understand, this code cleans it up a little bit, especially for the fast hash map part.
    
    ## How was this patch tested?
    
    existing tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark hash-agg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19869.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19869
    
----
commit 174f4ec2ea000de01da6f494db366dc3ff58ccc4
Author: Wenchen Fan <we...@databricks.com>
Date:   2017-11-24T12:43:37Z

    cleanup whole stage codegen for hash aggregate

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84432 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84432/testReport)** for PR 19869 at commit [`b1acd35`](https://github.com/apache/spark/commit/b1acd35d24ea449a97434b54759cf8ed441661f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154580311
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -596,7 +596,7 @@ case class HashAggregateExec(
             ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
               s"$fastHashMapTerm = new $fastHashMapClassName();")
             ctx.addMutableState(
    -          classOf[java.util.Iterator[ColumnarRow]].getName,
    +          s"java.util.Iterator<${classOf[ColumnarRow]}>",
    --- End diff --
    
    ```scala
    scala> s"java.util.Iterator<${classOf[ColumnarRow]}>"
    res2: String = java.util.Iterator<class org.apache.spark.sql.execution.vectorized.ColumnarRow>
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84435 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84435/testReport)** for PR 19869 at commit [`f0320d5`](https://github.com/apache/spark/commit/f0320d5f7b3406334a5250ecc2d9c5248142f34d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84416/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    LGTM for super minor comments...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19869


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154562413
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -672,48 +668,56 @@ case class HashAggregateExec(
     
         def outputFromRowBasedMap: String = {
           s"""
    -       while ($iterTermForFastHashMap.next()) {
    -         $numOutput.add(1);
    -         UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    -         UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    -         $outputFunc($keyTerm, $bufferTerm);
    -
    -         if (shouldStop()) return;
    -       }
    -       $fastHashMapTerm.close();
    -     """
    +         |while ($iterTermForFastHashMap.next()) {
    --- End diff --
    
    super nit: can we also drop unnecessary spaces in the head from this file? e.g., 
    ```
          s"""
             | private void $doAgg() throws java.io.IOException {
    ```
    ```
          s"""
             |private void $doAgg() throws java.io.IOException {
    ```
    https://github.com/cloud-fan/spark/blob/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L233


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84416 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84416/testReport)** for PR 19869 at commit [`479fd8d`](https://github.com/apache/spark/commit/479fd8d4cb6ba6f0126d996362125746dfeac8f2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528431
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -672,48 +668,56 @@ case class HashAggregateExec(
     
         def outputFromRowBasedMap: String = {
           s"""
    -       while ($iterTermForFastHashMap.next()) {
    -         $numOutput.add(1);
    -         UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    -         UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    -         $outputFunc($keyTerm, $bufferTerm);
    -
    -         if (shouldStop()) return;
    -       }
    -       $fastHashMapTerm.close();
    -     """
    +         |while ($iterTermForFastHashMap.next()) {
    +         |  UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    +         |  UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    +         |  $outputFunc($keyTerm, $bufferTerm);
    +         |
    +         |  if (shouldStop()) return;
    +         |}
    +         |$fastHashMapTerm.close();
    +       """.stripMargin
         }
     
         // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow
         def outputFromVectorizedMap: String = {
             val row = ctx.freshName("fastHashMapRow")
             ctx.currentVars = null
             ctx.INPUT_ROW = row
    -        val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
    -          groupingKeySchema.toAttributes.zipWithIndex
    +      val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
    +        groupingKeySchema.toAttributes.zipWithIndex
               .map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) }
    -        )
    -        val generateBufferRow = GenerateUnsafeProjection.createCode(ctx,
    -          bufferSchema.toAttributes.zipWithIndex
    -          .map { case (attr, i) =>
    -            BoundReference(groupingKeySchema.length + i, attr.dataType, attr.nullable) })
    -        s"""
    -           | while ($iterTermForFastHashMap.hasNext()) {
    -           |   $numOutput.add(1);
    -           |   org.apache.spark.sql.execution.vectorized.ColumnarRow $row =
    -           |     (org.apache.spark.sql.execution.vectorized.ColumnarRow)
    -           |     $iterTermForFastHashMap.next();
    -           |   ${generateKeyRow.code}
    -           |   ${generateBufferRow.code}
    -           |   $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value});
    -           |
    -           |   if (shouldStop()) return;
    -           | }
    -           |
    -           | $fastHashMapTerm.close();
    -         """.stripMargin
    +      )
    +      val generateBufferRow = GenerateUnsafeProjection.createCode(ctx,
    +        bufferSchema.toAttributes.zipWithIndex.map { case (attr, i) =>
    +          BoundReference(groupingKeySchema.length + i, attr.dataType, attr.nullable)
    +        })
    +      val columnarRowCls = classOf[ColumnarRow].getName
    +      s"""
    +         |while ($iterTermForFastHashMap.hasNext()) {
    +         |  $columnarRowCls $row = ($columnarRowCls) $iterTermForFastHashMap.next();
    +         |  ${generateKeyRow.code}
    +         |  ${generateBufferRow.code}
    +         |  $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value});
    +         |
    +         |  if (shouldStop()) return;
    +         |}
    +         |
    +         |$fastHashMapTerm.close();
    +       """.stripMargin
         }
     
    +    def outputFromRegularHashMap: String = {
    +      s"""
    +         |while ($iterTerm.next()) {
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L731


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528409
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -672,48 +668,56 @@ case class HashAggregateExec(
     
         def outputFromRowBasedMap: String = {
           s"""
    -       while ($iterTermForFastHashMap.next()) {
    -         $numOutput.add(1);
    -         UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    -         UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    -         $outputFunc($keyTerm, $bufferTerm);
    -
    -         if (shouldStop()) return;
    -       }
    -       $fastHashMapTerm.close();
    -     """
    +         |while ($iterTermForFastHashMap.next()) {
    +         |  UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    +         |  UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    +         |  $outputFunc($keyTerm, $bufferTerm);
    +         |
    +         |  if (shouldStop()) return;
    +         |}
    +         |$fastHashMapTerm.close();
    +       """.stripMargin
         }
     
         // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow
         def outputFromVectorizedMap: String = {
             val row = ctx.freshName("fastHashMapRow")
             ctx.currentVars = null
             ctx.INPUT_ROW = row
    -        val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
    -          groupingKeySchema.toAttributes.zipWithIndex
    --- End diff --
    
    The indentation was wrong previously.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84435/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154561232
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -621,34 +622,30 @@ case class HashAggregateExec(
         val iterTerm = ctx.freshName("mapIter")
         ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm)
     
    -    def generateGenerateCode(): String = {
    -      if (isFastHashMapEnabled) {
    -        if (isVectorizedHashMapEnabled) {
    -          s"""
    -               | ${fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()}
    -          """.stripMargin
    -        } else {
    -          s"""
    -               | ${fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()}
    -          """.stripMargin
    -        }
    -      } else ""
    +    if (isFastHashMapEnabled) {
    +      val generatedMap = if (isVectorizedHashMapEnabled) {
    +        fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()
    +      } else {
    +        fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()
    +      }
    +      ctx.addInnerClass(generatedMap)
         }
    -    ctx.addInnerClass(generateGenerateCode())
     
         val doAgg = ctx.freshName("doAggregateWithKeys")
         val peakMemory = metricTerm(ctx, "peakMemory")
         val spillSize = metricTerm(ctx, "spillSize")
         val avgHashProbe = metricTerm(ctx, "avgHashProbe")
    +    val finishFashHashMap = if (isFastHashMapEnabled) {
    --- End diff --
    
    nit: can we merge this branch with the the branch above (line 625) for hashmap stuffs?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154569741
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -768,12 +762,8 @@ case class HashAggregateExec(
     
         // generate hash code for key
         val hashExpr = Murmur3Hash(groupingExpressions, 42)
    -    ctx.currentVars = input
         val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
     
    -    val inputAttr = aggregateBufferAttributes ++ child.output
    -    ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input
    -
         val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
         incCounter) = if (testFallbackStartsAt.isDefined) {
    --- End diff --
    
    I'll have another PR for this part, will reformat it at that time


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84403/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84416 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84416/testReport)** for PR 19869 at commit [`479fd8d`](https://github.com/apache/spark/commit/479fd8d4cb6ba6f0126d996362125746dfeac8f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154556638
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -672,48 +668,56 @@ case class HashAggregateExec(
     
         def outputFromRowBasedMap: String = {
           s"""
    -       while ($iterTermForFastHashMap.next()) {
    -         $numOutput.add(1);
    -         UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    -         UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    -         $outputFunc($keyTerm, $bufferTerm);
    -
    -         if (shouldStop()) return;
    -       }
    -       $fastHashMapTerm.close();
    -     """
    +         |while ($iterTermForFastHashMap.next()) {
    +         |  UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    +         |  UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    +         |  $outputFunc($keyTerm, $bufferTerm);
    +         |
    +         |  if (shouldStop()) return;
    +         |}
    +         |$fastHashMapTerm.close();
    +       """.stripMargin
         }
     
         // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow
         def outputFromVectorizedMap: String = {
             val row = ctx.freshName("fastHashMapRow")
    --- End diff --
    
    ah good catch!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84435/testReport)** for PR 19869 at commit [`f0320d5`](https://github.com/apache/spark/commit/f0320d5f7b3406334a5250ecc2d9c5248142f34d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84432 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84432/testReport)** for PR 19869 at commit [`b1acd35`](https://github.com/apache/spark/commit/b1acd35d24ea449a97434b54759cf8ed441661f2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84410 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84410/testReport)** for PR 19869 at commit [`9b8ae3d`](https://github.com/apache/spark/commit/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154566445
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -882,45 +851,65 @@ case class HashAggregateExec(
              |${evaluateVariables(unsafeRowBufferEvals)}
              |// update unsafe row buffer
              |${updateUnsafeRowBuffer.mkString("\n").trim}
    -           """.stripMargin
    +       """.stripMargin
         }
     
    +    val updateRowInHashMap: String = {
    +      if (isFastHashMapEnabled) {
    +        ctx.INPUT_ROW = fastRowBuffer
    +        val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr))
    +        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
    +        val effectiveCodes = subExprs.codes.mkString("\n")
    +        val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
    +          boundUpdateExpr.map(_.genCode(ctx))
    +        }
    +        val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
    +          val dt = updateExpr(i).dataType
    +          ctx.updateColumn(
    +            fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled)
    +        }
    +
    +        // If fast hash map is on, we first generate code to update row in fast hash map, if the
    +        // previous loop up hit fast hash map. Otherwise, update row in regular hash map.
    +        s"""
    +           |if ($fastRowBuffer != null) {
    +           |  // common sub-expressions
    +           |  $effectiveCodes
    +           |  // evaluate aggregate function
    +           |  ${evaluateVariables(fastRowEvals)}
    +           |  // update fast row
    +           |  ${updateFastRow.mkString("\n").trim}
    +           |} else {
    +           |  $updateRowInRegularHashMap
    +           |}
    +       """.stripMargin
    +      } else {
    +        updateRowInRegularHashMap
    +      }
    +    }
    +
    +    val declareFastRowBuffer: String = if (isFastHashMapEnabled) {
    +      val rowType = if (isVectorizedHashMapEnabled) {
    +        classOf[MutableColumnarRow].getName
    +      } else {
    +        "UnsafeRow"
    +      }
    +      s"$rowType $fastRowBuffer = null;"
    +    } else ""
     
         // We try to do hash map based in-memory aggregation first. If there is not enough memory (the
         // hash map will return null for new key), we spill the hash map to disk to free memory, then
         // continue to do in-memory aggregation and spilling until all the rows had been processed.
         // Finally, sort the spilled aggregate buffers by key, and merge them together for same key.
         s"""
          UnsafeRow $unsafeRowBuffer = null;
    --- End diff --
    
    nit: How about this for just consistency?
    ```
        s"""
         $declareRowBuffer
    
         $findOrInsertHashMap
    
         $incCounter
    
         $updateRowInHashMap
         """
    ```
    Then,
    ```
        val declareRowBuffer: String = if (isFastHashMapEnabled) {
          val rowType = if (isVectorizedHashMapEnabled) {
            classOf[MutableColumnarRow].getName
          } else {
            "UnsafeRow"
          }
          s"""
             |UnsafeRow $unsafeRowBuffer = null;
             |$rowType $fastRowBuffer = null;
           """.stripMargin
        } else {
          s"UnsafeRow $unsafeRowBuffer = null;"
        }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84419 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84419/testReport)** for PR 19869 at commit [`0589e7d`](https://github.com/apache/spark/commit/0589e7d8b57aa71c72dd052f687a4706fb0c5567).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84432/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154576368
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -573,94 +574,84 @@ case class HashAggregateExec(
           enableTwoLevelHashMap(ctx)
         } else {
           sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match {
    -        case "true" => logWarning("Two level hashmap is disabled but vectorized hashmap is " +
    -          "enabled.")
    -        case null | "" | "false" => None
    +        case "true" =>
    +          logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.")
    +        case _ =>
           }
         }
    -    fastHashMapTerm = ctx.freshName("fastHashMap")
    -    val fastHashMapClassName = ctx.freshName("FastHashMap")
    -    val fastHashMapGenerator =
    -      if (isVectorizedHashMapEnabled) {
    -        new VectorizedHashMapGenerator(ctx, aggregateExpressions,
    -          fastHashMapClassName, groupingKeySchema, bufferSchema)
    -      } else {
    -        new RowBasedHashMapGenerator(ctx, aggregateExpressions,
    -          fastHashMapClassName, groupingKeySchema, bufferSchema)
    -      }
     
         val thisPlan = ctx.addReferenceObj("plan", this)
     
    -    // Create a name for iterator from vectorized HashMap
    +    // Create a name for the iterator from the fast hash map.
         val iterTermForFastHashMap = ctx.freshName("fastHashMapIter")
         if (isFastHashMapEnabled) {
    +      // Generates the fast hash map class and creates the fash hash map term.
    +      fastHashMapTerm = ctx.freshName("fastHashMap")
    +      val fastHashMapClassName = ctx.freshName("FastHashMap")
           if (isVectorizedHashMapEnabled) {
    +        val generatedMap = new VectorizedHashMapGenerator(ctx, aggregateExpressions,
    +          fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
    +        ctx.addInnerClass(generatedMap)
    +
             ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
               s"$fastHashMapTerm = new $fastHashMapClassName();")
             ctx.addMutableState(
    -          "java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarRow>",
    +          classOf[java.util.Iterator[ColumnarRow]].getName,
    --- End diff --
    
    Is this as same as before?
    
    ```scala
    scala> classOf[java.util.Iterator[Int]].getName
    res2: String = java.util.Iterator
    ```
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84403 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84403/testReport)** for PR 19869 at commit [`174f4ec`](https://github.com/apache/spark/commit/174f4ec2ea000de01da6f494db366dc3ff58ccc4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528578
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -882,45 +851,65 @@ case class HashAggregateExec(
              |${evaluateVariables(unsafeRowBufferEvals)}
              |// update unsafe row buffer
              |${updateUnsafeRowBuffer.mkString("\n").trim}
    -           """.stripMargin
    +       """.stripMargin
         }
     
    +    val updateRowInHashMap: String = {
    +      if (isFastHashMapEnabled) {
    +        ctx.INPUT_ROW = fastRowBuffer
    +        val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr))
    +        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
    +        val effectiveCodes = subExprs.codes.mkString("\n")
    +        val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
    +          boundUpdateExpr.map(_.genCode(ctx))
    +        }
    +        val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
    +          val dt = updateExpr(i).dataType
    +          ctx.updateColumn(
    +            fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled)
    +        }
    +
    +        // If fast hash map is on, we first generate code to update row in fast hash map, if the
    +        // previous loop up hit fast hash map. Otherwise, update row in regular hash map.
    +        s"""
    +           |if ($fastRowBuffer != null) {
    +           |  // common sub-expressions
    +           |  $effectiveCodes
    +           |  // evaluate aggregate function
    +           |  ${evaluateVariables(fastRowEvals)}
    +           |  // update fast row
    +           |  ${updateFastRow.mkString("\n").trim}
    +           |} else {
    +           |  $updateRowInRegularHashMap
    +           |}
    +       """.stripMargin
    +      } else {
    +        updateRowInRegularHashMap
    --- End diff --
    
    Previously we always declare the `fastRowBuffer` and have the `if (fastRowBuffer != null)` check. Now we don't generate then if fast hash map is not enabled.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154534333
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -672,48 +668,56 @@ case class HashAggregateExec(
     
         def outputFromRowBasedMap: String = {
           s"""
    -       while ($iterTermForFastHashMap.next()) {
    -         $numOutput.add(1);
    -         UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    -         UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    -         $outputFunc($keyTerm, $bufferTerm);
    -
    -         if (shouldStop()) return;
    -       }
    -       $fastHashMapTerm.close();
    -     """
    +         |while ($iterTermForFastHashMap.next()) {
    +         |  UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
    +         |  UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue();
    +         |  $outputFunc($keyTerm, $bufferTerm);
    +         |
    +         |  if (shouldStop()) return;
    +         |}
    +         |$fastHashMapTerm.close();
    +       """.stripMargin
         }
     
         // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow
         def outputFromVectorizedMap: String = {
             val row = ctx.freshName("fastHashMapRow")
    --- End diff --
    
    nit: Is it better to fix indentations for these three lines, too?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84419 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84419/testReport)** for PR 19869 at commit [`0589e7d`](https://github.com/apache/spark/commit/0589e7d8b57aa71c72dd052f687a4706fb0c5567).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154569858
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -768,12 +762,8 @@ case class HashAggregateExec(
     
         // generate hash code for key
         val hashExpr = Murmur3Hash(groupingExpressions, 42)
    -    ctx.currentVars = input
         val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
     
    -    val inputAttr = aggregateBufferAttributes ++ child.output
    -    ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input
    -
         val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
         incCounter) = if (testFallbackStartsAt.isDefined) {
    --- End diff --
    
    ok, thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528498
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -784,86 +774,65 @@ case class HashAggregateExec(
           ("true", "true", "", "")
         }
     
    -    // We first generate code to probe and update the fast hash map. If the probe is
    -    // successful the corresponding fast row buffer will hold the mutable row
    -    val findOrInsertFastHashMap: Option[String] = {
    +    val findOrInsertRegularHashMap: String =
    +      s"""
    +         |// generate grouping key
    +         |${unsafeRowKeyCode.code.trim}
    +         |${hashEval.code.trim}
    +         |if ($checkFallbackForBytesToBytesMap) {
    +         |  // try to get the buffer from hash map
    +         |  $unsafeRowBuffer =
    +         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
    +         |}
    +         |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
    +         |// aggregation after processing all input rows.
    +         |if ($unsafeRowBuffer == null) {
    +         |  if ($sorterTerm == null) {
    +         |    $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
    +         |  } else {
    +         |    $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
    +         |  }
    +         |  $resetCounter
    +         |  // the hash map had be spilled, it should have enough memory now,
    +         |  // try to allocate buffer again.
    +         |  $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
    +         |    $unsafeRowKeys, ${hashEval.value});
    +         |  if ($unsafeRowBuffer == null) {
    +         |    // failed to allocate the first page
    +         |    throw new OutOfMemoryError("No enough memory for aggregation");
    +         |  }
    +         |}
    +       """.stripMargin
    +
    +    val findOrInsertHashMap: String = {
           if (isFastHashMapEnabled) {
    -        Option(
    -          s"""
    -             |
    -             |if ($checkFallbackForGeneratedHashMap) {
    -             |  ${fastRowKeys.map(_.code).mkString("\n")}
    -             |  if (${fastRowKeys.map("!" + _.isNull).mkString(" && ")}) {
    -             |    $fastRowBuffer = $fastHashMapTerm.findOrInsert(
    -             |        ${fastRowKeys.map(_.value).mkString(", ")});
    -             |  }
    -             |}
    -         """.stripMargin)
    +        // If fast hash map is on, we first generate code to probe and update the fast hash map.
    +        // If the probe is successful the corresponding fast row buffer will hold the mutable row.
    +        s"""
    +           |if ($checkFallbackForGeneratedHashMap) {
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L794


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84410 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84410/testReport)** for PR 19869 at commit [`9b8ae3d`](https://github.com/apache/spark/commit/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84419/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    **[Test build #84403 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84403/testReport)** for PR 19869 at commit [`174f4ec`](https://github.com/apache/spark/commit/174f4ec2ea000de01da6f494db366dc3ff58ccc4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154565659
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -768,12 +762,8 @@ case class HashAggregateExec(
     
         // generate hash code for key
         val hashExpr = Murmur3Hash(groupingExpressions, 42)
    -    ctx.currentVars = input
         val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
     
    -    val inputAttr = aggregateBufferAttributes ++ child.output
    -    ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input
    -
         val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
         incCounter) = if (testFallbackStartsAt.isDefined) {
    --- End diff --
    
    nit: need some indents in the head?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528386
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -444,6 +444,7 @@ case class HashAggregateExec(
         val funcName = ctx.freshName("doAggregateWithKeysOutput")
         val keyTerm = ctx.freshName("keyTerm")
         val bufferTerm = ctx.freshName("bufferTerm")
    +    val numOutput = metricTerm(ctx, "numOutputRows")
    --- End diff --
    
    update the `numOutputRows`  in the result function instead of doing it for both fast hash map and regular hash map.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528455
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -784,86 +774,65 @@ case class HashAggregateExec(
           ("true", "true", "", "")
         }
     
    -    // We first generate code to probe and update the fast hash map. If the probe is
    -    // successful the corresponding fast row buffer will hold the mutable row
    -    val findOrInsertFastHashMap: Option[String] = {
    +    val findOrInsertRegularHashMap: String =
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L833


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    cc @juliuszsompolski @kiszk @viirya @maropu 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19869
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84410/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154682180
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -596,7 +596,7 @@ case class HashAggregateExec(
             ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
               s"$fastHashMapTerm = new $fastHashMapClassName();")
             ctx.addMutableState(
    -          classOf[java.util.Iterator[ColumnarRow]].getName,
    +          s"java.util.Iterator<${classOf[ColumnarRow]}>",
    --- End diff --
    
    damn...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org