You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/14 00:00:46 UTC
[spark] branch master updated: [SPARK-39441][SQL] Speed up DeduplicateRelations
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 00199e3964b [SPARK-39441][SQL] Speed up DeduplicateRelations
00199e3964b is described below
commit 00199e3964bbad4a830f7062e52bf03c180677cd
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Tue Jun 14 08:00:31 2022 +0800
[SPARK-39441][SQL] Speed up DeduplicateRelations
### What changes were proposed in this pull request?
This PR improves the performance of the Analyzer rule `DeduplicateRelations`. It removes the new HashSet created by each recursive call and uses a global HashSet to keep track of the visited relations.
### Why are the changes needed?
Improve Analyzer performance. Here is the result of TPCDSQuerySuite:
```
// Before this PR
org.apache.spark.sql.catalyst.analysis.DeduplicateRelations 269181786 / 1028666896 124 / 1302
// After this PR
org.apache.spark.sql.catalyst.analysis.DeduplicateRelations 144507333 / 643810447 124 / 1302
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests.
Closes #36837 from allisonwang-db/spark-39441-speed-up-dedup-rel.
Authored-by: allisonwang-db <al...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalyst/analysis/DeduplicateRelations.scala | 25 ++++++++--------------
1 file changed, 9 insertions(+), 16 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index aed19f2499f..1ffe421e19b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -90,36 +90,31 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
* @param existingRelations the known unique relations for a LogicalPlan
* @param plan the LogicalPlan that requires the deduplication
* @return (the new LogicalPlan which already deduplicate all duplicated relations (if any),
- * all relations of the new LogicalPlan, whether the plan is changed or not)
+ * whether the plan is changed or not)
*/
private def renewDuplicatedRelations(
existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
- plan: LogicalPlan)
- : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) = plan match {
- case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty, false)
+ plan: LogicalPlan): (LogicalPlan, Boolean) = plan match {
+ case p: LogicalPlan if p.isStreaming => (plan, false)
case m: MultiInstanceRelation =>
val planWrapper = ReferenceEqualPlanWrapper(m)
if (existingRelations.contains(planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
- (newNode, mutable.HashSet.empty, true)
+ (newNode, true)
} else {
- val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
- mWrapper.add(planWrapper)
- (m, mWrapper, false)
+ existingRelations.add(planWrapper)
+ (m, false)
}
case plan: LogicalPlan =>
- val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
for (c <- plan.children) {
- val (renewed, collected, changed) =
- renewDuplicatedRelations(existingRelations ++ relations, c)
+ val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
newChildren += renewed
- relations ++= collected
if (changed) {
planChanged = true
}
@@ -127,9 +122,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
val planWithNewSubquery = plan.transformExpressions {
case subquery: SubqueryExpression =>
- val (renewed, collected, changed) = renewDuplicatedRelations(
- existingRelations ++ relations, subquery.plan)
- relations ++= collected
+ val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
if (changed) planChanged = true
subquery.withNewPlan(renewed)
}
@@ -157,7 +150,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
} else {
plan
}
- (newPlan, relations, planChanged)
+ (newPlan, planChanged)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org