You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/13 11:43:56 UTC
[spark] branch branch-3.0 updated: [SPARK-30528][SQL] Turn off DPP
subquery duplication by default
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 59a13c9 [SPARK-30528][SQL] Turn off DPP subquery duplication by default
59a13c9 is described below
commit 59a13c9b7bc3b3aa5b5bc30a60344f849c0f8012
Author: maryannxue <ma...@apache.org>
AuthorDate: Thu Feb 13 19:32:38 2020 +0800
[SPARK-30528][SQL] Turn off DPP subquery duplication by default
### What changes were proposed in this pull request?
This PR adds a config for Dynamic Partition Pruning subquery duplication and turns it off by default due to its potential performance regression.
When planning a DPP filter, it seeks to reuse the broadcast exchange relation if the corresponding join is a BHJ with the filter relation being on the build side, otherwise it will either opt out or plan the filter as an un-reusable subquery duplication based on the cost estimate. However, the cost estimate is not accurate and only takes into account the table scan overhead, thus adding an un-reusable subquery duplication DPP filter can sometimes cause perf regression.
This PR turns off the subquery duplication DPP filter by:
1. adding a config `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly` and setting it `true` by default.
2. removing the existing meaningless config `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast` since we always want to reuse broadcast results if possible.
### Why are the changes needed?
This is to fix a potential performance regression caused by DPP.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Updated DynamicPartitionPruningSuite to test the new configuration.
Closes #27551 from maryannxue/spark-30528.
Authored-by: maryannxue <ma...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 453d5261b22ebcdd5886e65ab9d0d9857051e76a)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 12 +-
.../sql/dynamicpruning/PartitionPruning.scala | 4 +-
.../dynamicpruning/PlanDynamicPruningFilters.scala | 5 +-
.../spark/sql/DynamicPartitionPruningSuite.scala | 183 ++++++++-------------
.../scala/org/apache/spark/sql/ExplainSuite.scala | 3 +-
5 files changed, 82 insertions(+), 125 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 442711d..19c94e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -259,11 +259,11 @@ object SQLConf {
.doubleConf
.createWithDefault(0.5)
- val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST =
- buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast")
+ val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY =
+ buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly")
.internal()
- .doc("When true, dynamic partition pruning will seek to reuse the broadcast results from " +
- "a broadcast hash join operation.")
+ .doc("When true, dynamic partition pruning will only apply when the broadcast exchange of " +
+ "a broadcast hash join operation can be reused as the dynamic pruning filter.")
.booleanConf
.createWithDefault(true)
@@ -2303,8 +2303,8 @@ class SQLConf extends Serializable with Logging {
def dynamicPartitionPruningFallbackFilterRatio: Double =
getConf(DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO)
- def dynamicPartitionPruningReuseBroadcast: Boolean =
- getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST)
+ def dynamicPartitionPruningReuseBroadcastOnly: Boolean =
+ getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY)
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala
index 48ba861..28f8f49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PartitionPruning.scala
@@ -86,7 +86,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
filteringPlan: LogicalPlan,
joinKeys: Seq[Expression],
hasBenefit: Boolean): LogicalPlan = {
- val reuseEnabled = SQLConf.get.dynamicPartitionPruningReuseBroadcast
+ val reuseEnabled = SQLConf.get.exchangeReuseEnabled
val index = joinKeys.indexOf(filteringKey)
if (hasBenefit || reuseEnabled) {
// insert a DynamicPruning wrapper to identify the subquery during query planning
@@ -96,7 +96,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
filteringPlan,
joinKeys,
index,
- !hasBenefit),
+ !hasBenefit || SQLConf.get.dynamicPartitionPruningReuseBroadcastOnly),
pruningPlan)
} else {
// abort dynamic partition pruning
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala
index 1398dc0..be00f72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -36,9 +36,6 @@ import org.apache.spark.sql.internal.SQLConf
case class PlanDynamicPruningFilters(sparkSession: SparkSession)
extends Rule[SparkPlan] with PredicateHelper {
- private def reuseBroadcast: Boolean =
- SQLConf.get.dynamicPartitionPruningReuseBroadcast && SQLConf.get.exchangeReuseEnabled
-
/**
* Identify the shape in which keys of a given plan are broadcasted.
*/
@@ -59,7 +56,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
sparkSession, sparkSession.sessionState.planner, buildPlan)
// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
- val canReuseExchange = reuseBroadcast && buildKeys.nonEmpty &&
+ val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
plan.find {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
left.sameResult(sparkPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index e1f9bcc..f7b51d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -239,7 +239,8 @@ class DynamicPartitionPruningSuite
*/
test("simple inner join triggers DPP with mock-up tables") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
withTable("df1", "df2") {
spark.range(1000)
.select(col("id"), col("id").as("k"))
@@ -271,7 +272,8 @@ class DynamicPartitionPruningSuite
*/
test("self-join on a partitioned table should not trigger DPP") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
withTable("fact") {
sql(
s"""
@@ -302,7 +304,8 @@ class DynamicPartitionPruningSuite
*/
test("static scan metrics") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
withTable("fact", "dim") {
spark.range(10)
.map { x => Tuple3(x, x + 1, 0) }
@@ -370,7 +373,8 @@ class DynamicPartitionPruningSuite
test("DPP should not be rewritten as an existential join") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "1.5",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
val df = sql(
s"""
|SELECT * FROM product p WHERE p.store_id NOT IN
@@ -395,7 +399,7 @@ class DynamicPartitionPruningSuite
*/
test("DPP triggers only for certain types of query") {
withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false") {
Given("dynamic partition pruning disabled")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false") {
val df = sql(
@@ -433,7 +437,8 @@ class DynamicPartitionPruningSuite
}
Given("left-semi join with partition column on the left side")
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
val df = sql(
"""
|SELECT * FROM fact_sk f
@@ -457,7 +462,8 @@ class DynamicPartitionPruningSuite
}
Given("right outer join with partition column on the left side")
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
val df = sql(
"""
|SELECT * FROM fact_sk f RIGHT OUTER JOIN dim_store s
@@ -474,7 +480,8 @@ class DynamicPartitionPruningSuite
*/
test("filtering ratio policy fallback") {
withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
Given("no stats and selective predicate")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
@@ -543,7 +550,8 @@ class DynamicPartitionPruningSuite
*/
test("filtering ratio policy with stats when the broadcast pruning is disabled") {
withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
Given("disabling the use of stats in the DPP heuristic")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false") {
@@ -613,10 +621,7 @@ class DynamicPartitionPruningSuite
test("partition pruning in broadcast hash joins with non-deterministic probe part") {
Given("alias with simple join condition, and non-deterministic query")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid, f.sid FROM
@@ -630,10 +635,7 @@ class DynamicPartitionPruningSuite
}
Given("alias over multiple sub-queries with simple join condition")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid, f.sid FROM
@@ -651,10 +653,7 @@ class DynamicPartitionPruningSuite
test("partition pruning in broadcast hash joins with aliases") {
Given("alias with simple join condition, using attribute names only")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid, f.sid FROM
@@ -674,10 +673,7 @@ class DynamicPartitionPruningSuite
}
Given("alias with expr as join condition")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid, f.sid FROM
@@ -697,10 +693,7 @@ class DynamicPartitionPruningSuite
}
Given("alias over multiple sub-queries with simple join condition")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid, f.sid FROM
@@ -722,10 +715,7 @@ class DynamicPartitionPruningSuite
}
Given("alias over multiple sub-queries with simple join condition")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid_d as pid, f.sid_d as sid FROM
@@ -754,10 +744,8 @@ class DynamicPartitionPruningSuite
test("partition pruning in broadcast hash joins") {
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
val df = sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
@@ -777,9 +765,10 @@ class DynamicPartitionPruningSuite
Given("disable reuse broadcast results and enable subquery duplication")
withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0.5") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0.5",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
val df = sql(
"""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
@@ -798,52 +787,47 @@ class DynamicPartitionPruningSuite
}
Given("enable reuse broadcast results and disable query duplication")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
- val df = sql(
- """
- |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
- |JOIN dim_stats s
- |ON f.store_id = s.store_id WHERE s.country = 'DE'
- """.stripMargin)
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
+ val df = sql(
+ """
+ |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
+ |JOIN dim_stats s
+ |ON f.store_id = s.store_id WHERE s.country = 'DE'
+ """.stripMargin)
- checkPartitionPruningPredicate(df, false, true)
+ checkPartitionPruningPredicate(df, false, true)
- checkAnswer(df,
- Row(1030, 2, 10, 3) ::
- Row(1040, 2, 50, 3) ::
- Row(1050, 2, 50, 3) ::
- Row(1060, 2, 50, 3) :: Nil
- )
+ checkAnswer(df,
+ Row(1030, 2, 10, 3) ::
+ Row(1040, 2, 50, 3) ::
+ Row(1050, 2, 50, 3) ::
+ Row(1060, 2, 50, 3) :: Nil
+ )
}
Given("disable broadcast hash join and disable query duplication")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
- val df = sql(
- """
- |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
- |JOIN dim_stats s
- |ON f.store_id = s.store_id WHERE s.country = 'DE'
- """.stripMargin)
+ withSQLConf(
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df = sql(
+ """
+ |SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
+ |JOIN dim_stats s
+ |ON f.store_id = s.store_id WHERE s.country = 'DE'
+ """.stripMargin)
- checkPartitionPruningPredicate(df, false, false)
+ checkPartitionPruningPredicate(df, false, false)
- checkAnswer(df,
- Row(1030, 2, 10, 3) ::
- Row(1040, 2, 50, 3) ::
- Row(1050, 2, 50, 3) ::
- Row(1060, 2, 50, 3) :: Nil
- )
+ checkAnswer(df,
+ Row(1030, 2, 10, 3) ::
+ Row(1040, 2, 50, 3) ::
+ Row(1050, 2, 50, 3) ::
+ Row(1060, 2, 50, 3) :: Nil
+ )
}
Given("disable broadcast hash join and enable query duplication")
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
val df = sql(
@@ -865,9 +849,7 @@ class DynamicPartitionPruningSuite
}
test("broadcast a single key in a HashedRelation") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("fact", "dim") {
spark.range(100).select(
$"id",
@@ -925,9 +907,7 @@ class DynamicPartitionPruningSuite
}
test("broadcast multiple keys in a LongHashedRelation") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("fact", "dim") {
spark.range(100).select(
$"id",
@@ -962,9 +942,7 @@ class DynamicPartitionPruningSuite
}
test("broadcast multiple keys in an UnsafeHashedRelation") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("fact", "dim") {
spark.range(100).select(
$"id",
@@ -999,9 +977,7 @@ class DynamicPartitionPruningSuite
}
test("different broadcast subqueries with identical children") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("fact", "dim") {
spark.range(100).select(
$"id",
@@ -1073,7 +1049,7 @@ class DynamicPartitionPruningSuite
}
test("avoid reordering broadcast join keys to match input hash partitioning") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
spark.range(100).select(
@@ -1123,9 +1099,7 @@ class DynamicPartitionPruningSuite
* duplicated partitioning keys, also used to uniquely identify the dynamic pruning filters.
*/
test("dynamic partition pruning ambiguity issue across nested joins") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("store", "date", "item") {
spark.range(500)
.select((($"id" + 30) % 50).as("ss_item_sk"),
@@ -1163,9 +1137,7 @@ class DynamicPartitionPruningSuite
}
test("cleanup any DPP filter that isn't pushed down due to expression id clashes") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("fact", "dim") {
spark.range(1000).select($"id".as("A"), $"id".as("AA"))
.write.partitionBy("A").format(tableFormat).mode("overwrite").saveAsTable("fact")
@@ -1186,10 +1158,7 @@ class DynamicPartitionPruningSuite
}
test("cleanup any DPP filter that isn't pushed down due to non-determinism") {
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.pid, f.sid FROM
@@ -1204,9 +1173,7 @@ class DynamicPartitionPruningSuite
}
test("join key with multiple references on the filtering plan") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0",
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// when enable AQE, the reusedExchange is inserted when executed.
withTable("fact", "dim") {
@@ -1240,9 +1207,7 @@ class DynamicPartitionPruningSuite
}
test("Make sure dynamic pruning works on uncorrelated queries") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT d.store_id,
@@ -1266,10 +1231,7 @@ class DynamicPartitionPruningSuite
test("Plan broadcast pruning only when the broadcast can be reused") {
Given("dynamic pruning filter on the build side")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT f.date_id, f.store_id, f.product_id, f.units_sold FROM fact_np f
@@ -1288,10 +1250,7 @@ class DynamicPartitionPruningSuite
}
Given("dynamic pruning filter on the probe side")
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
- SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "0") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT /*+ BROADCAST(f)*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index d9f4d6d..b591705 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -239,7 +239,8 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
test("explain formatted - check presence of subquery in case of DPP") {
withTable("df1", "df2") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
withTable("df1", "df2") {
spark.range(1000).select(col("id"), col("id").as("k"))
.write
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org