You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/13 11:43:56 UTC

[spark] branch branch-3.0 updated: [SPARK-30528][SQL] Turn off DPP subquery duplication by default

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 59a13c9  [SPARK-30528][SQL] Turn off DPP subquery duplication by default
59a13c9 is described below

commit 59a13c9b7bc3b3aa5b5bc30a60344f849c0f8012
Author: maryannxue <ma...@apache.org>
AuthorDate: Thu Feb 13 19:32:38 2020 +0800

    [SPARK-30528][SQL] Turn off DPP subquery duplication by default
    
    ### What changes were proposed in this pull request?
    This PR adds a config for Dynamic Partition Pruning subquery duplication and turns it off by default due to its potential performance regression.
    When planning a DPP filter, it seeks to reuse the broadcast exchange relation if the corresponding join is a BHJ with the filter relation being on the build side, otherwise it will either opt out or plan the filter as an un-reusable subquery duplication based on the cost estimate. However, the cost estimate is not accurate and only takes into account the table scan overhead, thus adding an un-reusable subquery duplication DPP filter can sometimes cause perf regression.
    This PR turns off the subquery duplication DPP filter by:
    1. adding a config `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly` and setting it `true` by default.
    2. removing the existing meaningless config `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast` since we always want to reuse broadcast results if possible.
    
    ### Why are the changes needed?
    This is to fix a potential performance regression caused by DPP.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Updated DynamicPartitionPruningSuite to test the new configuration.
    
    Closes #27551 from maryannxue/spark-30528.
    
    Authored-by: maryannxue <ma...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 453d5261b22ebcdd5886e65ab9d0d9857051e76a)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  12 +-
 .../sql/dynamicpruning/PartitionPruning.scala      |   4 +-
 .../dynamicpruning/PlanDynamicPruningFilters.scala |   5 +-
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 183 ++++++++-------------
 .../scala/org/apache/spark/sql/ExplainSuite.scala  |   3 +-
 5 files changed, 82 insertions(+), 125 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 442711d..19c94e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -259,11 +259,11 @@ object SQLConf {
     .doubleConf
     .createWithDefault(0.5)
 
-  val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST =
-    buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast")
+  val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY =
+    buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly")
       .internal()
-      .doc("When true, dynamic partition pruning will seek to reuse the broadcast results from " +
-        "a broadcast hash join operation.")
+      .doc("When true, dynamic partition pruning will only apply when the broadcast exchange of " +
+        "a broadcast hash join operation can be reused as the dynamic pruning filter.")
       .booleanConf
       .createWithDefault(true)
 
@@ -2303,8 +2303,8 @@ class SQLConf extends Serializable with Logging {
   def dynamicPartitionPruningFallbackFilterRatio: Double =
     getConf(DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO)
 
-  def dynamicPartitionPruningReuseBroadcast: Boolean =
-    getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST)
+  def dynamicPartitionPruningReuseBroadcastOnly: Boolean =
+    getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY)
 
   def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala
index 48ba861..28f8f49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala
@@ -86,7 +86,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
       filteringPlan: LogicalPlan,
       joinKeys: Seq[Expression],
       hasBenefit: Boolean): LogicalPlan = {
-    val reuseEnabled = SQLConf.get.dynamicPartitionPruningReuseBroadcast
+    val reuseEnabled = SQLConf.get.exchangeReuseEnabled
     val index = joinKeys.indexOf(filteringKey)
     if (hasBenefit || reuseEnabled) {
       // insert a DynamicPruning wrapper to identify the subquery during query planning
@@ -96,7 +96,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
           filteringPlan,
           joinKeys,
           index,
-          !hasBenefit),
+          !hasBenefit || SQLConf.get.dynamicPartitionPruningReuseBroadcastOnly),
         pruningPlan)
     } else {
       // abort dynamic partition pruning
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala
index 1398dc0..be00f72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -36,9 +36,6 @@ import org.apache.spark.sql.internal.SQLConf
 case class PlanDynamicPruningFilters(sparkSession: SparkSession)
     extends Rule[SparkPlan] with PredicateHelper {
 
-  private def reuseBroadcast: Boolean =
-    SQLConf.get.dynamicPartitionPruningReuseBroadcast && SQLConf.get.exchangeReuseEnabled
-
   /**
    * Identify the shape in which keys of a given plan are broadcasted.
    */
@@ -59,7 +56,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
           sparkSession, sparkSession.sessionState.planner, buildPlan)
         // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
         // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
-        val canReuseExchange = reuseBroadcast && buildKeys.nonEmpty &&
+        val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
           plan.find {
             case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
               left.sameResult(sparkPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index e1f9bcc..f7b51d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -239,7 +239,8 @@ class DynamicPartitionPruningSuite
    */
   test("simple inner join triggers DPP with mock-up tables") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       withTable("df1", "df2") {
         spark.range(1000)
           .select(col("id"), col("id").as("k"))
@@ -271,7 +272,8 @@ class DynamicPartitionPruningSuite
    */
   test("self-join on a partitioned table should not trigger DPP") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       withTable("fact") {
         sql(
           s"""
@@ -302,7 +304,8 @@ class DynamicPartitionPruningSuite
    */
   test("static scan metrics") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       withTable("fact", "dim") {
         spark.range(10)
           .map { x => Tuple3(x, x + 1, 0) }
@@ -370,7 +373,8 @@ class DynamicPartitionPruningSuite
   test("DPP should not be rewritten as an existential join") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
       SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "1.5",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       val df = sql(
         s"""
            |SELECT * FROM product p WHERE p.store_id NOT IN
@@ -395,7 +399,7 @@ class DynamicPartitionPruningSuite
    */
   test("DPP triggers only for certain types of query") {
     withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false") {
       Given("dynamic partition pruning disabled")
       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false") {
         val df = sql(
@@ -433,7 +437,8 @@ class DynamicPartitionPruningSuite
       }
 
       Given("left-semi join with partition column on the left side")
-      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+        SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
         val df = sql(
           """
             |SELECT * FROM fact_sk f
@@ -457,7 +462,8 @@ class DynamicPartitionPruningSuite
       }
 
       Given("right outer join with partition column on the left side")
-      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+        SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
         val df = sql(
           """
             |SELECT * FROM fact_sk f RIGHT OUTER JOIN dim_store s
@@ -474,7 +480,8 @@ class DynamicPartitionPruningSuite
    */
   test("filtering ratio policy fallback") {
     withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       Given("no stats and selective predicate")
       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
         SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
@@ -543,7 +550,8 @@ class DynamicPartitionPruningSuite
    */
   test("filtering ratio policy with stats when the broadcast pruning is disabled") {
     withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       Given("disabling the use of stats in the DPP heuristic")
       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
         SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false") {
@@ -613,10 +621,7 @@ class DynamicPartitionPruningSuite
 
   test("partition pruning in broadcast hash joins with non-deterministic probe part") {
     Given("alias with simple join condition, and non-deterministic query")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid, f.sid FROM
@@ -630,10 +635,7 @@ class DynamicPartitionPruningSuite
     }
 
     Given("alias over multiple sub-queries with simple join condition")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid, f.sid FROM
@@ -651,10 +653,7 @@ class DynamicPartitionPruningSuite
 
   test("partition pruning in broadcast hash joins with aliases") {
     Given("alias with simple join condition, using attribute names only")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid, f.sid FROM
@@ -674,10 +673,7 @@ class DynamicPartitionPruningSuite
     }
 
     Given("alias with expr as join condition")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid, f.sid FROM
@@ -697,10 +693,7 @@ class DynamicPartitionPruningSuite
     }
 
     Given("alias over multiple sub-queries with simple join condition")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid, f.sid FROM
@@ -722,10 +715,7 @@ class DynamicPartitionPruningSuite
     }
 
     Given("alias over multiple sub-queries with simple join condition")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid_d as pid, f.sid_d as sid FROM
@@ -754,10 +744,8 @@ class DynamicPartitionPruningSuite
   test("partition pruning in broadcast hash joins") {
     Given("disable broadcast pruning and disable subquery duplication")
     withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       val df = sql(
         """
           |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
@@ -777,9 +765,10 @@ class DynamicPartitionPruningSuite
 
     Given("disable reuse broadcast results and enable subquery duplication")
     withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false",
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0.5") {
+      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0.5",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
       val df = sql(
         """
           |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
@@ -798,52 +787,47 @@ class DynamicPartitionPruningSuite
     }
 
     Given("enable reuse broadcast results and disable query duplication")
-      withSQLConf(
-        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-        SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-        SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
-        val df = sql(
-          """
-            |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
-            |JOIN dim_stats s
-            |ON f.store_id = s.store_id WHERE s.country = 'DE'
-          """.stripMargin)
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
+      val df = sql(
+        """
+          |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
+          |JOIN dim_stats s
+          |ON f.store_id = s.store_id WHERE s.country = 'DE'
+        """.stripMargin)
 
-        checkPartitionPruningPredicate(df, false, true)
+      checkPartitionPruningPredicate(df, false, true)
 
-        checkAnswer(df,
-          Row(1030, 2, 10, 3) ::
-          Row(1040, 2, 50, 3) ::
-          Row(1050, 2, 50, 3) ::
-          Row(1060, 2, 50, 3) :: Nil
-        )
+      checkAnswer(df,
+        Row(1030, 2, 10, 3) ::
+        Row(1040, 2, 50, 3) ::
+        Row(1050, 2, 50, 3) ::
+        Row(1060, 2, 50, 3) :: Nil
+      )
     }
 
     Given("disable broadcast hash join and disable query duplication")
-      withSQLConf(
-        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-        SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-        SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
-        val df = sql(
-          """
-            |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
-            |JOIN dim_stats s
-            |ON f.store_id = s.store_id WHERE s.country = 'DE'
-          """.stripMargin)
+    withSQLConf(
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df = sql(
+        """
+          |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
+          |JOIN dim_stats s
+          |ON f.store_id = s.store_id WHERE s.country = 'DE'
+        """.stripMargin)
 
-        checkPartitionPruningPredicate(df, false, false)
+      checkPartitionPruningPredicate(df, false, false)
 
-        checkAnswer(df,
-          Row(1030, 2, 10, 3) ::
-          Row(1040, 2, 50, 3) ::
-          Row(1050, 2, 50, 3) ::
-          Row(1060, 2, 50, 3) :: Nil
-        )
+      checkAnswer(df,
+        Row(1030, 2, 10, 3) ::
+        Row(1040, 2, 50, 3) ::
+        Row(1050, 2, 50, 3) ::
+        Row(1060, 2, 50, 3) :: Nil
+      )
     }
 
     Given("disable broadcast hash join and enable query duplication")
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
       SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
       val df = sql(
@@ -865,9 +849,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("broadcast a single key in a HashedRelation") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withTable("fact", "dim") {
         spark.range(100).select(
           $"id",
@@ -925,9 +907,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("broadcast multiple keys in a LongHashedRelation") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withTable("fact", "dim") {
         spark.range(100).select(
           $"id",
@@ -962,9 +942,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("broadcast multiple keys in an UnsafeHashedRelation") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withTable("fact", "dim") {
         spark.range(100).select(
           $"id",
@@ -999,9 +977,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("different broadcast subqueries with identical children") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withTable("fact", "dim") {
         spark.range(100).select(
           $"id",
@@ -1073,7 +1049,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("avoid reordering broadcast join keys to match input hash partitioning") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       withTable("large", "dimTwo", "dimThree") {
         spark.range(100).select(
@@ -1123,9 +1099,7 @@ class DynamicPartitionPruningSuite
    * duplicated partitioning keys, also used to uniquely identify the dynamic pruning filters.
    */
   test("dynamic partition pruning ambiguity issue across nested joins") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withTable("store", "date", "item") {
         spark.range(500)
           .select((($"id" + 30) % 50).as("ss_item_sk"),
@@ -1163,9 +1137,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("cleanup any DPP filter that isn't pushed down due to expression id clashes") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withTable("fact", "dim") {
         spark.range(1000).select($"id".as("A"), $"id".as("AA"))
           .write.partitionBy("A").format(tableFormat).mode("overwrite").saveAsTable("fact")
@@ -1186,10 +1158,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("cleanup any DPP filter that isn't pushed down due to non-determinism") {
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.pid, f.sid FROM
@@ -1204,9 +1173,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("join key with multiple references on the filtering plan") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0",
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       // when enable AQE, the reusedExchange is inserted when executed.
       withTable("fact", "dim") {
@@ -1240,9 +1207,7 @@ class DynamicPartitionPruningSuite
   }
 
   test("Make sure dynamic pruning works on uncorrelated queries") {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT d.store_id,
@@ -1266,10 +1231,7 @@ class DynamicPartitionPruningSuite
 
   test("Plan broadcast pruning only when the broadcast can be reused") {
     Given("dynamic pruning filter on the build side")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT f.date_id, f.store_id, f.product_id, f.units_sold FROM fact_np f
@@ -1288,10 +1250,7 @@ class DynamicPartitionPruningSuite
     }
 
     Given("dynamic pruning filter on the probe side")
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       val df = sql(
         """
           |SELECT /*+ BROADCAST(f)*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index d9f4d6d..b591705 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -239,7 +239,8 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
   test("explain formatted - check presence of subquery in case of DPP") {
     withTable("df1", "df2") {
       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+        SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
         withTable("df1", "df2") {
           spark.range(1000).select(col("id"), col("id").as("k"))
             .write


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org