You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2023/06/16 04:07:05 UTC
[spark] branch branch-3.3 updated: [SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new b9699062d3a [SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec
b9699062d3a is described below
commit b9699062d3a142cb1273fd0f290d5bb21b6c3002
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Jun 16 11:18:38 2023 +0800
[SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec
### What changes were proposed in this pull request?
This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`.
For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example:
```sql
SELECT count(*) FROM tbl WHERE false;
```
The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891).
### Why are the changes needed?
Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan ===
!Aggregate [id#5L], [id#5L] Project [id#5L]
+- Union false, false +- Union false, false
:- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)])
+- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)])
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #41576 from wangyum/SPARK-44040.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
(cherry picked from commit 55ba63c257b6617ec3d2aca5bc1d0989d4f29de8)
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../spark/sql/execution/adaptive/LogicalQueryStage.scala | 11 +++++++++--
.../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 8 ++++++++
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
index 5e6f1b5a884..8ce2452cc14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, RepartitionOperation, Statistics}
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOGICAL_QUERY_STAGE, REPARTITION_OPERATION, TreePattern}
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
/**
* The LogicalPlan wrapper for a [[QueryStageExec]], or a snippet of physical plan containing
@@ -53,8 +54,14 @@ case class LogicalQueryStage(
override def computeStats(): Statistics = {
// TODO this is not accurate when there is other physical nodes above QueryStageExec.
val physicalStats = physicalPlan.collectFirst {
- case s: QueryStageExec => s
- }.flatMap(_.computeStats())
+ case a: BaseAggregateExec if a.groupingExpressions.isEmpty =>
+ a.collectFirst {
+ case s: QueryStageExec => s.computeStats()
+ }.flatten.map { stat =>
+ if (stat.rowCount.contains(0)) stat.copy(rowCount = Some(1)) else stat
+ }
+ case s: QueryStageExec => s.computeStats()
+ }.flatten
if (physicalStats.isDefined) {
logDebug(s"Physical stats available as ${physicalStats.get} for plan: $physicalPlan")
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 0055b94fa06..ca481b6f944 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2649,6 +2649,14 @@ class AdaptiveQueryExecSuite
assert(df.rdd.getNumPartitions == 3)
}
}
+
+ test("SPARK-44040: Fix compute stats when AggregateExec nodes above QueryStageExec") {
+ val emptyDf = spark.range(1).where("false")
+ val aggDf1 = emptyDf.agg(sum("id").as("id")).withColumn("name", lit("df1"))
+ val aggDf2 = emptyDf.agg(sum("id").as("id")).withColumn("name", lit("df2"))
+ val unionDF = aggDf1.union(aggDf2)
+ checkAnswer(unionDF.select("id").distinct, Seq(Row(null)))
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org