You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2023/02/06 13:25:00 UTC

[spark] branch branch-3.3 updated: [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 17b71239fc1 [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
17b71239fc1 is described below

commit 17b71239fc10d620ac41cbfc1a2a984adc08da82
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Mon Feb 6 20:36:57 2023 +0800

    [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
    
    ### What changes were proposed in this pull request?
    
    Unfortunately https://github.com/apache/spark/pull/32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that.
    
    ### Why are the changes needed?
    The following query:
    ```
    SELECT
      (SELECT count(distinct c1) FROM t1),
      (SELECT count(distinct c2) FROM t1)
    ```
    currently fails with:
    ```
    java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list.
            at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538)
    ```
    but works again after this PR.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the above query works again.
    
    ### How was this patch tested?
    Added new UT.
    
    Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge.
    
    Authored-by: Peter Toth <pe...@gmail.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
    (cherry picked from commit 5940b9884b4b172f65220da7857d2952b137bc51)
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../spark/sql/execution/SparkOptimizer.scala       |  3 ++-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 25 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index b8861715726..416865976df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -55,7 +55,8 @@ class SparkOptimizer(
       InjectRuntimeFilter,
       RewritePredicateSubquery) :+
     Batch("MergeScalarSubqueries", Once,
-      MergeScalarSubqueries) :+
+      MergeScalarSubqueries,
+      RewriteDistinctAggregates) :+
     Batch("Pushdown Filters from PartitionPruning", fixedPoint,
       PushDownPredicates) :+
     Batch("Cleanup filters that cannot be pushed down", Once,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 0975772fb90..cedc68cfc84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2269,4 +2269,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
       assert(findProject(df2).size == 3)
     }
   }
+
+  test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
+    withTempView("t1") {
+      Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
+
+      checkAnswer(sql(
+        """
+          |SELECT
+          |  (SELECT count(distinct c1) FROM t1),
+          |  (SELECT count(distinct c2) FROM t1)
+          |""".stripMargin),
+        Row(2, 2))
+
+      // In this case we don't merge the subqueries as `RewriteDistinctAggregates` kicks off for the
+      // 2 subqueries first but `MergeScalarSubqueries` is not prepared for the `Expand` nodes that
+      // are inserted by the rewrite.
+      checkAnswer(sql(
+        """
+          |SELECT
+          |  (SELECT count(distinct c1) + sum(distinct c2) FROM t1),
+          |  (SELECT count(distinct c2) + sum(distinct c1) FROM t1)
+          |""".stripMargin),
+        Row(8, 6))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org