You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/03/27 00:35:14 UTC

[spark] branch branch-3.1 updated: [SPARK-38528][SQL][3.2] Eagerly iterate over aggregate sequence when building project list in `ExtractGenerator`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b3e3110  [SPARK-38528][SQL][3.2] Eagerly iterate over aggregate sequence when building project list in `ExtractGenerator`
b3e3110 is described below

commit b3e31107277cea5e441eb3708535e740712027a6
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Sun Mar 27 09:31:49 2022 +0900

    [SPARK-38528][SQL][3.2] Eagerly iterate over aggregate sequence when building project list in `ExtractGenerator`
    
    Backport of #35837.
    
    When building the project list from an aggregate sequence in `ExtractGenerator`, convert the aggregate sequence to an `IndexedSeq` before performing the flatMap operation.
    
    This query fails with a `NullPointerException`:
    ```
    val df = Seq(1, 2, 3).toDF("v")
    df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*).collect
    ```
    If you change `Stream` to `Seq`, then it succeeds.
    
    `ExtractGenerator` uses a flatMap operation over `aggList` for two purposes:
    
    - To produce a new aggregate list
    - to update `projectExprs` (which is initialized as an array of nulls).
    
    When `aggList` is a `Stream`, the flatMap operation evaluates lazily, so all entries in `projectExprs` after the first will still be null when the rule completes.
    
    Changing `aggList` to an `IndexedSeq` forces the flatMap to evaluate eagerly.
    
    No
    
    New unit test
    
    Closes #35851 from bersprockets/generator_aggregate_issue_32.
    
    Authored-by: Bruce Robbins <be...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 7842621ff50001e1cde8e2e6a2fc48c2cdcaf3d4)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala    | 1 +
 .../test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala   | 7 +++++++
 2 files changed, 8 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae5e2d1..aacc801 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2649,6 +2649,7 @@ class Analyzer(override val catalogManager: CatalogManager)
 
         val projectExprs = Array.ofDim[NamedExpression](aggList.length)
         val newAggList = aggList
+          .toIndexedSeq
           .map(trimNonTopLevelAliases)
           .zipWithIndex
           .flatMap {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 8f44903..5ac2383 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -356,6 +356,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
     val df = Seq(1, 2, 3).toDF("v")
     checkAnswer(df.select(explode(array(min($"v"), max($"v")))), Row(1) :: Row(3) :: Nil)
   }
+
+  test("SPARK-38528: generator in stream of aggregate expressions") {
+    val df = Seq(1, 2, 3).toDF("v")
+    checkAnswer(
+      df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
+      Row(1, 6) :: Row(3, 6) :: Nil)
+  }
 }
 
 case class EmptyGenerator() extends Generator {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org