You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/16 08:04:01 UTC

[spark] branch branch-3.2 updated: [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new de3b8b9  [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange
de3b8b9 is described below

commit de3b8b996f4f8627318f63dac5dcd6427f4f1bc7
Author: Ke Jia <ke...@intel.com>
AuthorDate: Fri Jul 16 16:01:07 2021 +0800

    [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange
    
    ### What changes were proposed in this pull request?
    This PR add the DPP + AQE support when spark can't reuse the broadcast but executing the DPP subquery is cheaper.
    
    ### Why are the changes needed?
    Improve AQE + DPP
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Adding new ut
    
    Closes #32861 from JkSelf/supportDPP3.
    
    Lead-authored-by: Ke Jia <ke...@intel.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit c1b3f86c58ff9a54d2d422a8332ed3e1080cc86c)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/SubqueryAdaptiveBroadcastExec.scala  |  3 ++
 .../adaptive/InsertAdaptiveSparkPlan.scala         |  3 +-
 .../PlanAdaptiveDynamicPruningFilters.scala        | 22 +++++++++++--
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 36 +++++++++-------------
 4 files changed, 39 insertions(+), 25 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala
index b21bfef..e7092ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.errors.QueryExecutionErrors
 
 /**
@@ -34,6 +35,8 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 case class SubqueryAdaptiveBroadcastExec(
     name: String,
     index: Int,
+    onlyInBroadcast: Boolean,
+    @transient buildPlan: LogicalPlan,
     buildKeys: Seq[Expression],
     child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode {
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 2bcfa1b..c92878c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -140,7 +140,8 @@ case class InsertAdaptiveSparkPlan(
 
         val name = s"dynamicpruning#${exprId.id}"
         val subquery = SubqueryAdaptiveBroadcastExec(
-          name, broadcastKeyIndex, buildKeys, executedPlan)
+          name, broadcastKeyIndex, onlyInBroadcast,
+          buildPlan, buildKeys, executedPlan)
         subqueryMap.put(exprId.id, subquery)
       case _ =>
     }))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
index 68d4a8c..f9c1bbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.sql.execution.adaptive
 
-import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, Literal}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.execution._
@@ -38,7 +39,7 @@ case class PlanAdaptiveDynamicPruningFilters(
     plan.transformAllExpressionsWithPruning(
       _.containsAllPatterns(DYNAMIC_PRUNING_EXPRESSION, IN_SUBQUERY_EXEC)) {
       case DynamicPruningExpression(InSubqueryExec(
-          value, SubqueryAdaptiveBroadcastExec(name, index, buildKeys,
+          value, SubqueryAdaptiveBroadcastExec(name, index, onlyInBroadcast, buildPlan, buildKeys,
           adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) =>
         val packedKeys = BindReferences.bindReferences(
           HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
@@ -62,8 +63,23 @@ case class PlanAdaptiveDynamicPruningFilters(
           val broadcastValues = SubqueryBroadcastExec(
             name, index, buildKeys, newAdaptivePlan)
           DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
-        } else {
+        } else if (onlyInBroadcast) {
           DynamicPruningExpression(Literal.TrueLiteral)
+        } else {
+          // we need to apply an aggregate on the buildPlan in order to be column pruned
+          val alias = Alias(buildKeys(index), buildKeys(index).toString)()
+          val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
+
+          val session = adaptivePlan.context.session
+          val planner = session.sessionState.planner
+          // Here we can't call the QueryExecution.prepareExecutedPlan() method to
+          // get the sparkPlan as Non-AQE use case, which will cause the physical
+          // plan optimization rules be inserted twice, once in AQE framework and
+          // another in prepareExecutedPlan() method.
+          val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate)
+          val newAdaptivePlan = adaptivePlan.copy(inputPlan = sparkPlan)
+          val values = SubqueryExec(name, newAdaptivePlan)
+          DynamicPruningExpression(InSubqueryExec(value, values, exprId))
         }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index b175701..38527fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -277,8 +277,7 @@ abstract class DynamicPartitionPruningSuiteBase
   /**
    * Test the result of a simple join on mock-up tables
    */
-  test("simple inner join triggers DPP with mock-up tables",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("simple inner join triggers DPP with mock-up tables") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -353,8 +352,7 @@ abstract class DynamicPartitionPruningSuiteBase
    * (2) DPP should be triggered only for certain join types
    * (3) DPP should trigger only when we have attributes on both sides of the join condition
    */
-  test("DPP triggers only for certain types of query",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("DPP triggers only for certain types of query") {
     withSQLConf(
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
@@ -449,11 +447,11 @@ abstract class DynamicPartitionPruningSuiteBase
   /**
    * The filtering policy has a fallback when the stats are unavailable
    */
-  test("filtering ratio policy fallback",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("filtering ratio policy fallback") {
     withSQLConf(
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
-      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false",
+      SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) {
       Given("no stats and selective predicate")
       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
         SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
@@ -520,8 +518,7 @@ abstract class DynamicPartitionPruningSuiteBase
   /**
    *  The filtering ratio policy performs best when it uses cardinality estimates
    */
-  test("filtering ratio policy with stats when the broadcast pruning is disabled",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("filtering ratio policy with stats when the broadcast pruning is disabled") {
     withSQLConf(
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -714,8 +711,7 @@ abstract class DynamicPartitionPruningSuiteBase
     }
   }
 
-  test("partition pruning in broadcast hash joins",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("partition pruning in broadcast hash joins") {
     Given("disable broadcast pruning and disable subquery duplication")
     withSQLConf(
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1022,8 +1018,7 @@ abstract class DynamicPartitionPruningSuiteBase
     }
   }
 
-  test("avoid reordering broadcast join keys to match input hash partitioning",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("avoid reordering broadcast join keys to match input hash partitioning") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       withTable("large", "dimTwo", "dimThree") {
@@ -1147,9 +1142,10 @@ abstract class DynamicPartitionPruningSuiteBase
     }
   }
 
-  test("join key with multiple references on the filtering plan",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
-    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
+  test("join key with multiple references on the filtering plan") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+      SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName
+    ) {
       // when enable AQE, the reusedExchange is inserted when executed.
       withTable("fact", "dim") {
         spark.range(100).select(
@@ -1209,8 +1205,7 @@ abstract class DynamicPartitionPruningSuiteBase
   }
 
   test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
-    "canonicalization and exchange reuse",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+    "canonicalization and exchange reuse") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
         val df = sql(
@@ -1222,7 +1217,7 @@ abstract class DynamicPartitionPruningSuiteBase
           """.stripMargin)
 
         checkPartitionPruningPredicate(df, false, false)
-        val reuseExchangeNodes = df.queryExecution.executedPlan.collect {
+        val reuseExchangeNodes = collect(df.queryExecution.executedPlan) {
           case se: ReusedExchangeExec => se
         }
         assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " +
@@ -1418,8 +1413,7 @@ abstract class DynamicPartitionPruningSuiteBase
     }
   }
 
-  test("SPARK-32855: Filtering side can not broadcast by join type",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+  test("SPARK-32855: Filtering side can not broadcast by join type") {
     withSQLConf(
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
       SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org