You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/19 05:24:07 UTC

[spark] branch master updated: [SPARK-37917][SQL] Push down limit 1 for right side of left semi/anti join if join condition is empty

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 61abae36 [SPARK-37917][SQL] Push down limit 1 for right side of left semi/anti join if join condition is empty
61abae36 is described below

commit 61abae36eaccfa0ccb6bd2916e28164a96207e34
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Wed Jan 19 13:23:15 2022 +0800

    [SPARK-37917][SQL] Push down limit 1 for right side of left semi/anti join if join condition is empty
    
    ### What changes were proposed in this pull request?
    
    It is safe to push down the limit 1 for the right side of left semi/anti join if the join condition is empty, since we only care if the right side is empty. For example:
    ```scala
    val numRows = 1024 * 1024 * 40
    
    spark.sql(s"CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b, id AS c FROM range(1, ${numRows}L, 1, 5)")
    spark.sql(s"CREATE TABLE t2 using parquet AS SELECT id AS a, id AS b, id AS c FROM range(1, ${numRows}L, 1, 5)")
    
    spark.sql("SELECT * FROM t1 LEFT SEMI JOIN t2 LIMIT 5").explain(true)
    ```
    
    Before this pr:
    ```
    == Optimized Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- Join LeftSemi
          :- LocalLimit 5
          :  +- Relation default.t1[a#8L,b#9L,c#10L] parquet
          +- Project
             +- Relation default.t2[a#11L,b#12L,c#13L] parquet
    ```
    
    After this pr:
    ```
    == Optimized Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- Join LeftSemi
          :- LocalLimit 5
          :  +- Relation default.t1[a#8L,b#9L,c#10L] parquet
          +- LocalLimit 1
             +- Project
                +- Relation default.t2[a#11L,b#12L,c#13L] parquet
    ```
    
    ### Why are the changes needed?
    
    Improve query performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #35216 from wangyum/SPARK-37917.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala     | 4 +++-
 .../org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala  | 4 ++--
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1c2f0af..357d11c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -684,7 +684,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
           left = maybePushLocalLimit(limitExpr, join.left),
           right = maybePushLocalLimit(limitExpr, join.right))
       case LeftSemi | LeftAnti if join.condition.isEmpty =>
-        join.copy(left = maybePushLocalLimit(limitExpr, join.left))
+        join.copy(
+          left = maybePushLocalLimit(limitExpr, join.left),
+          right = maybePushLocalLimit(Literal(1, IntegerType), join.right))
       case _ => join
     }
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 848416b..ee7f872 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -216,9 +216,9 @@ class LimitPushdownSuite extends PlanTest {
   test("SPARK-34514: Push down limit through LEFT SEMI and LEFT ANTI join") {
     // Push down when condition is empty
     Seq(LeftSemi, LeftAnti).foreach { joinType =>
-      val originalQuery = x.join(y, joinType).limit(1)
+      val originalQuery = x.join(y, joinType).limit(5)
       val optimized = Optimize.execute(originalQuery.analyze)
-      val correctAnswer = Limit(1, LocalLimit(1, x).join(y, joinType)).analyze
+      val correctAnswer = Limit(5, LocalLimit(5, x).join(LocalLimit(1, y), joinType)).analyze
       comparePlans(optimized, correctAnswer)
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org