You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/14 00:00:46 UTC

[spark] branch master updated: [SPARK-39441][SQL] Speed up DeduplicateRelations

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 00199e3964b [SPARK-39441][SQL] Speed up DeduplicateRelations
00199e3964b is described below

commit 00199e3964bbad4a830f7062e52bf03c180677cd
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Tue Jun 14 08:00:31 2022 +0800

    [SPARK-39441][SQL] Speed up DeduplicateRelations
    
    ### What changes were proposed in this pull request?
    
    This PR improves the performance of the Analyzer rule `DeduplicateRelations`. It removes the new HashSet created by each recursive call and uses a global HashSet to keep track of the visited relations.
    
    ### Why are the changes needed?
    
    Improve Analyzer performance. Here is the result of TPCDSQuerySuite:
    ```
    // Before this PR
    org.apache.spark.sql.catalyst.analysis.DeduplicateRelations  269181786 / 1028666896   124 / 1302
    // After this PR
    org.apache.spark.sql.catalyst.analysis.DeduplicateRelations  144507333 / 643810447    124 / 1302
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #36837 from allisonwang-db/spark-39441-speed-up-dedup-rel.
    
    Authored-by: allisonwang-db <al...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/analysis/DeduplicateRelations.scala   | 25 ++++++++--------------
 1 file changed, 9 insertions(+), 16 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index aed19f2499f..1ffe421e19b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -90,36 +90,31 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
    * @param existingRelations the known unique relations for a LogicalPlan
    * @param plan the LogicalPlan that requires the deduplication
    * @return (the new LogicalPlan which already deduplicate all duplicated relations (if any),
-   *         all relations of the new LogicalPlan, whether the plan is changed or not)
+   *          whether the plan is changed or not)
    */
   private def renewDuplicatedRelations(
       existingRelations: mutable.HashSet[ReferenceEqualPlanWrapper],
-      plan: LogicalPlan)
-    : (LogicalPlan, mutable.HashSet[ReferenceEqualPlanWrapper], Boolean) = plan match {
-    case p: LogicalPlan if p.isStreaming => (plan, mutable.HashSet.empty, false)
+      plan: LogicalPlan): (LogicalPlan, Boolean) = plan match {
+    case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
       val planWrapper = ReferenceEqualPlanWrapper(m)
       if (existingRelations.contains(planWrapper)) {
         val newNode = m.newInstance()
         newNode.copyTagsFrom(m)
-        (newNode, mutable.HashSet.empty, true)
+        (newNode, true)
       } else {
-        val mWrapper = new mutable.HashSet[ReferenceEqualPlanWrapper]()
-        mWrapper.add(planWrapper)
-        (m, mWrapper, false)
+        existingRelations.add(planWrapper)
+        (m, false)
       }
 
     case plan: LogicalPlan =>
-      val relations = new mutable.HashSet[ReferenceEqualPlanWrapper]()
       var planChanged = false
       val newPlan = if (plan.children.nonEmpty) {
         val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
         for (c <- plan.children) {
-          val (renewed, collected, changed) =
-            renewDuplicatedRelations(existingRelations ++ relations, c)
+          val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
           newChildren += renewed
-          relations ++= collected
           if (changed) {
             planChanged = true
           }
@@ -127,9 +122,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
 
         val planWithNewSubquery = plan.transformExpressions {
           case subquery: SubqueryExpression =>
-            val (renewed, collected, changed) = renewDuplicatedRelations(
-              existingRelations ++ relations, subquery.plan)
-            relations ++= collected
+            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
             if (changed) planChanged = true
             subquery.withNewPlan(renewed)
         }
@@ -157,7 +150,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
       } else {
         plan
       }
-      (newPlan, relations, planChanged)
+      (newPlan, planChanged)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org