You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/22 17:32:03 UTC

[spark] branch master updated: [SPARK-42832][SQL] Remove repartition if it is the child of LocalLimit

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f9c105e1b69 [SPARK-42832][SQL] Remove repartition if it is the child of LocalLimit
f9c105e1b69 is described below

commit f9c105e1b693760d8c904066fdb65630aa4aeb91
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Wed Mar 22 10:31:45 2023 -0700

    [SPARK-42832][SQL] Remove repartition if it is the child of LocalLimit
    
    ### What changes were proposed in this pull request?
    
    This PR enhances `CollapseRepartition` to remove repartition if it is the child of `LocalLimit`. Because its output is determined by the number of partitions and the expressions of the Repartition. Therefore, it is feasible to remove Repartition except for repartition by nondeterministic expressions, because users may expect to randomly take data.
    For example:
    ```sql
    SELECT /*+ REBALANCE */ * FROM t WHERE id > 1 LIMIT 5;
    ```
    
    Before this PR:
    ```
    == Optimized Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- RebalancePartitions
          +- Filter (isnotnull(id#0L) AND (id#0L > 1))
             +- Relation spark_catalog.default.t[id#0L] parquet
    ```
    
    After this PR:
    ```
    == Optimized Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- Filter (isnotnull(id#0L) AND (id#0L > 1))
          +- Relation spark_catalog.default.t[id#0L] parquet
    ```
    
    Note that we don't remove repartition if it looks like the user might want to take data randomly. For example:
    ```sql
    SELECT /*+ REPARTITION(3) */ * FROM t WHERE id > 1 LIMIT 5;
    SELECT * FROM t WHERE id > 1 DISTRIBUTE BY random() LIMIT 5;
    ```
    
    ### Why are the changes needed?
    
    Reduce shuffle to improve query performance. The use case is that we add a repartition to improve the parallelism on a JDBC table:
    <img src="https://user-images.githubusercontent.com/5399861/225855582-c3c81c7d-4617-4104-b669-76749a7468a0.png" width="400" height="700">
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #40462 from wangyum/SPARK-42832.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala       |  9 +++++++++
 .../catalyst/optimizer/CollapseRepartitionSuite.scala  | 18 ++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 13d1ee31a22..3e8571f3eb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1212,6 +1212,15 @@ object CollapseRepartition extends Rule[LogicalPlan] {
     // child.
     case r @ RebalancePartitions(_, child: RebalancePartitions, _, _) =>
       r.withNewChildren(child.children)
+    // Case 5: When a LocalLimit has a child of Repartition we can remove the Repartition.
+    // Because its output is determined by the number of partitions and the expressions of the
+    // Repartition. Therefore, it is feasible to remove Repartition except for repartition by
+    // nondeterministic expressions, because users may expect to randomly take data.
+    case l @ LocalLimit(_, r: RepartitionByExpression)
+        if r.partitionExpressions.nonEmpty && r.partitionExpressions.forall(_.deterministic) =>
+      l.copy(child = r.child)
+    case l @ LocalLimit(_, r: RebalancePartitions) =>
+      l.copy(child = r.child)
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
index f9eb6d2e760..7c8a90fe23e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Uuid
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -222,4 +223,21 @@ class CollapseRepartitionSuite extends PlanTest {
       comparePlans(optimized, expected)
     }
   }
+
+  test("SPARK-42832: Remove repartition if it is the child of LocalLimit") {
+    Seq(testRelation.distribute($"a")(2),
+      testRelation.rebalance(),
+      testRelation.rebalance($"a")).foreach { repartition =>
+      comparePlans(
+        Optimize.execute(repartition.limit(3).analyze),
+        testRelation.limit(3).analyze)
+    }
+
+    // In this case, do not remove repartition, the user may want to randomly take data.
+    Seq(testRelation.distribute()(2),
+      testRelation.distribute(Uuid())(2)).foreach { repartition =>
+      val plan = repartition.limit(3).analyze
+      comparePlans( Optimize.execute(plan), plan)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org