You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/07 14:49:28 UTC

[spark] branch branch-3.2 updated: [SPARK-39447][SQL][3.2] Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 32aff86477a [SPARK-39447][SQL][3.2] Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast
32aff86477a is described below

commit 32aff86477ac001b0ee047db08591d89e90c6eb8
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Jul 7 22:49:03 2022 +0800

    [SPARK-39447][SQL][3.2] Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast
    
    This is a backport of https://github.com/apache/spark/pull/36974 for branch-3.2
    
    ### What changes were proposed in this pull request?
    
    Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast exchange for DPP.
    
    ### Why are the changes needed?
    
    The currentPhysicalPlan can be wrapped with broadcast query stage so it is not safe to match it. For example:
     The broadcast exchange which is added by DPP is running before than the normal broadcast exchange(e.g. introduced by join).
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes bug fix
    
    ### How was this patch tested?
    
    add test
    
    Closes #37087 from ulysses-you/inputplan-3.2.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala    |  2 +-
 .../spark/sql/DynamicPartitionPruningSuite.scala      | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index e6c8be1397e..7aeb1c34329 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -658,7 +658,7 @@ case class AdaptiveSparkPlanExec(
       // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
       // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
       // already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
-      val finalPlan = currentPhysicalPlan match {
+      val finalPlan = inputPlan match {
         case b: BroadcastExchangeLike
           if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
         case _ => newPlan
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 89749e7de00..91176717774 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1597,6 +1597,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
 class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
   with EnableAdaptiveExecutionSuite {
 
+  test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") {
+    val df = sql(
+      """
+        |WITH empty_result AS (
+        |  SELECT * FROM fact_stats WHERE product_id < 0
+        |)
+        |SELECT *
+        |FROM   (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
+        |        FROM   fact_sk
+        |               JOIN empty_result
+        |                 ON fact_sk.product_id = empty_result.product_id) t2
+        |       JOIN empty_result
+        |         ON t2.store_id = empty_result.store_id
+      """.stripMargin)
+
+    checkPartitionPruningPredicate(df, false, false)
+    checkAnswer(df, Nil)
+  }
+
   test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " +
     "rather than createSparkPlan to re-plan subquery") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org