You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/14 08:06:42 UTC

[spark] branch master updated: [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 632784dec8a [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice
632784dec8a is described below

commit 632784dec8a0c682a85a888c66e83f709e402e18
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Nov 14 16:06:26 2022 +0800

    [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/38557 . We found that some optimizer rules can't be applied twice (those in the `Once` batch), but running the rule `OptimizeSubqueries` twice breaks it as it optimizes subqueries twice.
    
    This PR partially reverts https://github.com/apache/spark/pull/38557 to still invoke `OptimizeSubqueries` in `RowLevelOperationRuntimeGroupFiltering`. We don't fully revert https://github.com/apache/spark/pull/38557 because it's still beneficial to use IN subquery directly instead of using DPP framework as there is no join.
    
    ### Why are the changes needed?
    
    Fix the optimizer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    N/A
    
    Closes #38626 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala  | 6 ++++--
 .../sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala  | 2 +-
 .../sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala    | 2 +-
 .../dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala     | 6 ++++--
 4 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 9624bf1fa9f..c61fd9ce10f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -51,8 +51,10 @@ class SparkOptimizer(
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("PartitionPruning", Once,
       PartitionPruning,
-      RowLevelOperationRuntimeGroupFiltering,
-      OptimizeSubqueries) :+
+      // We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries
+      // twice which may break some optimizer rules that can only be applied once. The rule below
+      // only invokes `OptimizeSubqueries` to optimize newly added subqueries.
+      new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
     Batch("InjectRuntimeFilter", FixedPoint(1),
       InjectRuntimeFilter) :+
     Batch("MergeScalarSubqueries", Once,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
index 21bc55110fe..9a780c11eef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
 case class PlanAdaptiveDynamicPruningFilters(
     rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
   def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
+    if (!conf.dynamicPartitionPruningEnabled) {
       return plan
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index df5e3ea1365..c9ff28eb045 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
   }
 
   override def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
+    if (!conf.dynamicPartitionPruningEnabled) {
       return plan
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index bb5edc94fa5..f2b513e630b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
  *
  * Note this rule only applies to group-based row-level operations.
  */
-object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper {
+class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
 
   import DataSourceV2Implicits._
 
@@ -64,7 +65,8 @@ object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with Pre
           Filter(dynamicPruningCond, r)
       }
 
-      replaceData.copy(query = newQuery)
+      // optimize subqueries to rewrite them as joins and trigger job planning
+      replaceData.copy(query = optimizeSubqueries(newQuery))
   }
 
   private def buildMatchingRowsPlan(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org