You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/16 08:04:01 UTC
[spark] branch branch-3.2 updated: [SPARK-35710][SQL] Support DPP +
AQE when there is no reused broadcast exchange
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new de3b8b9 [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange
de3b8b9 is described below
commit de3b8b996f4f8627318f63dac5dcd6427f4f1bc7
Author: Ke Jia <ke...@intel.com>
AuthorDate: Fri Jul 16 16:01:07 2021 +0800
[SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange
### What changes were proposed in this pull request?
This PR add the DPP + AQE support when spark can't reuse the broadcast but executing the DPP subquery is cheaper.
### Why are the changes needed?
Improve AQE + DPP
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Adding new ut
Closes #32861 from JkSelf/supportDPP3.
Lead-authored-by: Ke Jia <ke...@intel.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit c1b3f86c58ff9a54d2d422a8332ed3e1080cc86c)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/SubqueryAdaptiveBroadcastExec.scala | 3 ++
.../adaptive/InsertAdaptiveSparkPlan.scala | 3 +-
.../PlanAdaptiveDynamicPruningFilters.scala | 22 +++++++++++--
.../spark/sql/DynamicPartitionPruningSuite.scala | 36 +++++++++-------------
4 files changed, 39 insertions(+), 25 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala
index b21bfef..e7092ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
@@ -34,6 +35,8 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
case class SubqueryAdaptiveBroadcastExec(
name: String,
index: Int,
+ onlyInBroadcast: Boolean,
+ @transient buildPlan: LogicalPlan,
buildKeys: Seq[Expression],
child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 2bcfa1b..c92878c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -140,7 +140,8 @@ case class InsertAdaptiveSparkPlan(
val name = s"dynamicpruning#${exprId.id}"
val subquery = SubqueryAdaptiveBroadcastExec(
- name, broadcastKeyIndex, buildKeys, executedPlan)
+ name, broadcastKeyIndex, onlyInBroadcast,
+ buildPlan, buildKeys, executedPlan)
subqueryMap.put(exprId.id, subquery)
case _ =>
}))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
index 68d4a8c..f9c1bbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.adaptive
-import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.execution._
@@ -38,7 +39,7 @@ case class PlanAdaptiveDynamicPruningFilters(
plan.transformAllExpressionsWithPruning(
_.containsAllPatterns(DYNAMIC_PRUNING_EXPRESSION, IN_SUBQUERY_EXEC)) {
case DynamicPruningExpression(InSubqueryExec(
- value, SubqueryAdaptiveBroadcastExec(name, index, buildKeys,
+ value, SubqueryAdaptiveBroadcastExec(name, index, onlyInBroadcast, buildPlan, buildKeys,
adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) =>
val packedKeys = BindReferences.bindReferences(
HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
@@ -62,8 +63,23 @@ case class PlanAdaptiveDynamicPruningFilters(
val broadcastValues = SubqueryBroadcastExec(
name, index, buildKeys, newAdaptivePlan)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
- } else {
+ } else if (onlyInBroadcast) {
DynamicPruningExpression(Literal.TrueLiteral)
+ } else {
+ // we need to apply an aggregate on the buildPlan in order to be column pruned
+ val alias = Alias(buildKeys(index), buildKeys(index).toString)()
+ val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
+
+ val session = adaptivePlan.context.session
+ val planner = session.sessionState.planner
+ // Here we can't call the QueryExecution.prepareExecutedPlan() method to
+ // get the sparkPlan as Non-AQE use case, which will cause the physical
+ // plan optimization rules be inserted twice, once in AQE framework and
+ // another in prepareExecutedPlan() method.
+ val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate)
+ val newAdaptivePlan = adaptivePlan.copy(inputPlan = sparkPlan)
+ val values = SubqueryExec(name, newAdaptivePlan)
+ DynamicPruningExpression(InSubqueryExec(value, values, exprId))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index b175701..38527fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -277,8 +277,7 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* Test the result of a simple join on mock-up tables
*/
- test("simple inner join triggers DPP with mock-up tables",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("simple inner join triggers DPP with mock-up tables") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -353,8 +352,7 @@ abstract class DynamicPartitionPruningSuiteBase
* (2) DPP should be triggered only for certain join types
* (3) DPP should trigger only when we have attributes on both sides of the join condition
*/
- test("DPP triggers only for certain types of query",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("DPP triggers only for certain types of query") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
@@ -449,11 +447,11 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* The filtering policy has a fallback when the stats are unavailable
*/
- test("filtering ratio policy fallback",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("filtering ratio policy fallback") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
- SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false",
+ SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) {
Given("no stats and selective predicate")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
@@ -520,8 +518,7 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* The filtering ratio policy performs best when it uses cardinality estimates
*/
- test("filtering ratio policy with stats when the broadcast pruning is disabled",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("filtering ratio policy with stats when the broadcast pruning is disabled") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -714,8 +711,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
- test("partition pruning in broadcast hash joins",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("partition pruning in broadcast hash joins") {
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1022,8 +1018,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
- test("avoid reordering broadcast join keys to match input hash partitioning",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("avoid reordering broadcast join keys to match input hash partitioning") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1147,9 +1142,10 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
- test("join key with multiple references on the filtering plan",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
+ test("join key with multiple references on the filtering plan") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+ SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName
+ ) {
// when enable AQE, the reusedExchange is inserted when executed.
withTable("fact", "dim") {
spark.range(100).select(
@@ -1209,8 +1205,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ "canonicalization and exchange reuse") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1222,7 +1217,7 @@ abstract class DynamicPartitionPruningSuiteBase
""".stripMargin)
checkPartitionPruningPredicate(df, false, false)
- val reuseExchangeNodes = df.queryExecution.executedPlan.collect {
+ val reuseExchangeNodes = collect(df.queryExecution.executedPlan) {
case se: ReusedExchangeExec => se
}
assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " +
@@ -1418,8 +1413,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
- test("SPARK-32855: Filtering side can not broadcast by join type",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ test("SPARK-32855: Filtering side can not broadcast by join type") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org