You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/04 00:31:22 UTC
[spark] branch master updated: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bb51add5c79 [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
bb51add5c79 is described below
commit bb51add5c79558df863d37965603387d40cc4387
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Sat Jun 4 08:30:49 2022 +0800
[SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
### What changes were proposed in this pull request?
This PR moves `RewritePredicateSubquery` into `InjectRuntimeFilter`.
### Why are the changes needed?
Reduce the number of `RewritePredicateSubquery` runs, since `spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled` is disabled by default. For example:
```
build/sbt "sql/testOnly *TPCDSQuerySuite"
```
Before this PR:
```
...
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 17978319 / 31026106 26 / 624
...
```
After this PR:
```
...
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 16680901 / 18994542 26 / 312
...
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing test.
Closes #36755 from wangyum/RewritePredicateSubquery.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala | 8 +++++++-
.../scala/org/apache/spark/sql/execution/SparkOptimizer.scala | 3 +--
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 01c1786e05a..baaf82c00db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -288,7 +288,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
case s: Subquery if s.correlated => plan
case _ if !conf.runtimeFilterSemiJoinReductionEnabled &&
!conf.runtimeFilterBloomFilterEnabled => plan
- case _ => tryInjectRuntimeFilter(plan)
+ case _ =>
+ val newPlan = tryInjectRuntimeFilter(plan)
+ if (conf.runtimeFilterSemiJoinReductionEnabled && !plan.fastEquals(newPlan)) {
+ RewritePredicateSubquery(newPlan)
+ } else {
+ newPlan
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 84e5975189b..0e7455009c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -52,8 +52,7 @@ class SparkOptimizer(
Batch("PartitionPruning", Once,
PartitionPruning) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
- InjectRuntimeFilter,
- RewritePredicateSubquery) :+
+ InjectRuntimeFilter) :+
Batch("MergeScalarSubqueries", Once,
MergeScalarSubqueries) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org