You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2023/02/06 13:02:24 UTC
[spark] branch branch-3.4 updated: [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 3173ce94304 [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
3173ce94304 is described below
commit 3173ce943044cb23b4fd0a21bf1670a30193c09d
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Mon Feb 6 20:36:57 2023 +0800
[SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
### What changes were proposed in this pull request?
Unfortunately https://github.com/apache/spark/pull/32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that.
### Why are the changes needed?
The following query:
```
SELECT
(SELECT count(distinct c1) FROM t1),
(SELECT count(distinct c2) FROM t1)
```
currently fails with:
```
java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list.
at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538)
```
but works again after this PR.
### Does this PR introduce _any_ user-facing change?
Yes, the above query works again.
### How was this patch tested?
Added new UT.
Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge.
Authored-by: Peter Toth <pe...@gmail.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
(cherry picked from commit 5940b9884b4b172f65220da7857d2952b137bc51)
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../spark/sql/execution/SparkOptimizer.scala | 3 ++-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 25 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 06e3888e7de..8c420838ca2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -58,7 +58,8 @@ class SparkOptimizer(
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter) :+
Batch("MergeScalarSubqueries", Once,
- MergeScalarSubqueries) :+
+ MergeScalarSubqueries,
+ RewriteDistinctAggregates) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 5d667bbdd8c..e61f43d8847 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2598,4 +2598,29 @@ class SubquerySuite extends QueryTest
Row("aa"))
}
}
+
+ test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
+ withTempView("t1") {
+ Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
+
+ checkAnswer(sql(
+ """
+ |SELECT
+ | (SELECT count(distinct c1) FROM t1),
+ | (SELECT count(distinct c2) FROM t1)
+ |""".stripMargin),
+ Row(2, 2))
+
+ // In this case we don't merge the subqueries as `RewriteDistinctAggregates` kicks off for the
+ // 2 subqueries first but `MergeScalarSubqueries` is not prepared for the `Expand` nodes that
+ // are inserted by the rewrite.
+ checkAnswer(sql(
+ """
+ |SELECT
+ | (SELECT count(distinct c1) + sum(distinct c2) FROM t1),
+ | (SELECT count(distinct c2) + sum(distinct c1) FROM t1)
+ |""".stripMargin),
+ Row(8, 6))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org