You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/14 08:06:42 UTC
[spark] branch master updated: [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 632784dec8a [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice
632784dec8a is described below
commit 632784dec8a0c682a85a888c66e83f709e402e18
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Nov 14 16:06:26 2022 +0800
[SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/38557 . We found that some optimizer rules can't be applied twice (those in the `Once` batch), but running the rule `OptimizeSubqueries` twice breaks it as it optimizes subqueries twice.
This PR partially reverts https://github.com/apache/spark/pull/38557 to still invoke `OptimizeSubqueries` in `RowLevelOperationRuntimeGroupFiltering`. We don't fully revert https://github.com/apache/spark/pull/38557 because it's still beneficial to use IN subquery directly instead of using DPP framework as there is no join.
### Why are the changes needed?
Fix the optimizer.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A
Closes #38626 from cloud-fan/follow.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala | 6 ++++--
.../sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala | 2 +-
.../sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala | 2 +-
.../dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala | 6 ++++--
4 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 9624bf1fa9f..c61fd9ce10f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -51,8 +51,10 @@ class SparkOptimizer(
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("PartitionPruning", Once,
PartitionPruning,
- RowLevelOperationRuntimeGroupFiltering,
- OptimizeSubqueries) :+
+ // We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries
+ // twice which may break some optimizer rules that can only be applied once. The rule below
+ // only invokes `OptimizeSubqueries` to optimize newly added subqueries.
+ new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter) :+
Batch("MergeScalarSubqueries", Once,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
index 21bc55110fe..9a780c11eef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
case class PlanAdaptiveDynamicPruningFilters(
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
+ if (!conf.dynamicPartitionPruningEnabled) {
return plan
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index df5e3ea1365..c9ff28eb045 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
}
override def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
+ if (!conf.dynamicPartitionPruningEnabled) {
return plan
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index bb5edc94fa5..f2b513e630b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
*
* Note this rule only applies to group-based row-level operations.
*/
-object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper {
+class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+ extends Rule[LogicalPlan] with PredicateHelper {
import DataSourceV2Implicits._
@@ -64,7 +65,8 @@ object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with Pre
Filter(dynamicPruningCond, r)
}
- replaceData.copy(query = newQuery)
+ // optimize subqueries to rewrite them as joins and trigger job planning
+ replaceData.copy(query = optimizeSubqueries(newQuery))
}
private def buildMatchingRowsPlan(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org