You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2023/06/16 04:07:05 UTC

[spark] branch branch-3.3 updated: [SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new b9699062d3a [SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec
b9699062d3a is described below

commit b9699062d3a142cb1273fd0f290d5bb21b6c3002
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Jun 16 11:18:38 2023 +0800

    [SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec
    
    ### What changes were proposed in this pull request?
    
    This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`.
    
    For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example:
    ```sql
    SELECT count(*) FROM tbl WHERE false;
    ```
    
    The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891).
    
    ### Why are the changes needed?
    
    Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`:
    ```
    === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan ===
    !Aggregate [id#5L], [id#5L]                                                                                   Project [id#5L]
     +- Union false, false                                                                                        +- Union false, false
        :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)])         :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)])
        +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)])      +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)])
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #41576 from wangyum/SPARK-44040.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
    (cherry picked from commit 55ba63c257b6617ec3d2aca5bc1d0989d4f29de8)
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../spark/sql/execution/adaptive/LogicalQueryStage.scala      | 11 +++++++++--
 .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala |  8 ++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
index 5e6f1b5a884..8ce2452cc14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, RepartitionOperation, Statistics}
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LOGICAL_QUERY_STAGE, REPARTITION_OPERATION, TreePattern}
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
 
 /**
  * The LogicalPlan wrapper for a [[QueryStageExec]], or a snippet of physical plan containing
@@ -53,8 +54,14 @@ case class LogicalQueryStage(
   override def computeStats(): Statistics = {
     // TODO this is not accurate when there is other physical nodes above QueryStageExec.
     val physicalStats = physicalPlan.collectFirst {
-      case s: QueryStageExec => s
-    }.flatMap(_.computeStats())
+      case a: BaseAggregateExec if a.groupingExpressions.isEmpty =>
+        a.collectFirst {
+          case s: QueryStageExec => s.computeStats()
+        }.flatten.map { stat =>
+          if (stat.rowCount.contains(0)) stat.copy(rowCount = Some(1)) else stat
+        }
+      case s: QueryStageExec => s.computeStats()
+    }.flatten
     if (physicalStats.isDefined) {
       logDebug(s"Physical stats available as ${physicalStats.get} for plan: $physicalPlan")
     } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 0055b94fa06..ca481b6f944 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2649,6 +2649,14 @@ class AdaptiveQueryExecSuite
       assert(df.rdd.getNumPartitions == 3)
     }
   }
+
+  test("SPARK-44040: Fix compute stats when AggregateExec nodes above QueryStageExec") {
+    val emptyDf = spark.range(1).where("false")
+    val aggDf1 = emptyDf.agg(sum("id").as("id")).withColumn("name", lit("df1"))
+    val aggDf2 = emptyDf.agg(sum("id").as("id")).withColumn("name", lit("df2"))
+    val unionDF = aggDf1.union(aggDf2)
+    checkAnswer(unionDF.select("id").distinct, Seq(Row(null)))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org