You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/08/31 14:26:59 UTC

[spark] branch master updated: [SPARK-40040][SQL] Push local limit to both sides if join condition is empty

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e1338235323 [SPARK-40040][SQL] Push local limit to both sides if join condition is empty
e1338235323 is described below

commit e13382353232828f16da264ec9462b5bbc8de8b6
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Wed Aug 31 22:26:23 2022 +0800

    [SPARK-40040][SQL] Push local limit to both sides if join condition is empty
    
    ### What changes were proposed in this pull request?
    
    Similar to https://github.com/apache/spark/pull/31567. This PR enhances `LimitPushDown` to support  push local limit to both sides if it is outer join and join condition is empty. It is safe to push down because without join condition is actually a cross join.
    
    ### Why are the changes needed?
    
    Improve query performance. For example:
    <img width="400" alt="image" src="https://user-images.githubusercontent.com/5399861/184052707-ebf50748-6870-4650-84c3-65d79b18ba9d.png">
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #37475 from wangyum/SPARK-40040.
    
    Lead-authored-by: Yuming Wang <yu...@ebay.com>
    Co-authored-by: Yuming Wang <wg...@gmail.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  22 ++--
 .../catalyst/optimizer/EliminateSortsSuite.scala   |   2 +-
 .../catalyst/optimizer/LimitPushdownSuite.scala    | 134 +++++++++++++++------
 3 files changed, 107 insertions(+), 51 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 8a18d55cb64..44ffc4f7495 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -719,9 +719,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
 
   private def pushLocalLimitThroughJoin(limitExpr: Expression, join: Join): Join = {
     join.joinType match {
-      case RightOuter => join.copy(right = maybePushLocalLimit(limitExpr, join.right))
-      case LeftOuter => join.copy(left = maybePushLocalLimit(limitExpr, join.left))
-      case _: InnerLike if join.condition.isEmpty =>
+      case RightOuter if join.condition.nonEmpty =>
+        join.copy(right = maybePushLocalLimit(limitExpr, join.right))
+      case LeftOuter if join.condition.nonEmpty =>
+        join.copy(left = maybePushLocalLimit(limitExpr, join.left))
+      case _: InnerLike | RightOuter | LeftOuter | FullOuter if join.condition.isEmpty =>
         join.copy(
           left = maybePushLocalLimit(limitExpr, join.left),
           right = maybePushLocalLimit(limitExpr, join.right))
@@ -743,15 +745,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
       LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))
 
     // Add extra limits below JOIN:
-    // 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides,
-    //    respectively.
-    // 2. For INNER and CROSS JOIN, we push limits to both the left and right sides if join
-    //    condition is empty.
+    // 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides
+    //    respectively if join condition is not empty.
+    // 2. For INNER, CROSS JOIN and OUTER JOIN, we push limits to both the left and right sides if
+    //    join condition is empty.
     // 3. For LEFT SEMI and LEFT ANTI JOIN, we push limits to the left side if join condition
     //    is empty.
-    // It's not safe to push limits below FULL OUTER JOIN in the general case without a more
-    // invasive rewrite. We also need to ensure that this limit pushdown rule will not eventually
-    // introduce limits on both sides if it is applied multiple times. Therefore:
+    // It's not safe to push limits below FULL OUTER JOIN with join condition in the general case
+    // without a more invasive rewrite. We also need to ensure that this limit pushdown rule will
+    // not eventually introduce limits on both sides if it is applied multiple times. Therefore:
     //   - If one side is already limited, stack another limit on top if the new limit is smaller.
     //     The redundant limit will be collapsed by the CombineLimits rule.
     case LocalLimit(exp, join: Join) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index f309da6b4f5..7cbc308182c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -372,7 +372,7 @@ class EliminateSortsSuite extends AnalysisTest {
       .limit(10)
     val optimized = Optimize.execute(joinPlan.analyze)
     val correctAnswer = LocalLimit(10, projectPlan)
-      .join(projectPlanB, LeftOuter)
+      .join(LocalLimit(10, projectPlanB), LeftOuter)
       .limit(10).analyze
     comparePlans(optimized, correctAnswer)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 9c093bda263..02631c4cf61 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -96,45 +96,75 @@ class LimitPushdownSuite extends PlanTest {
   // Outer join ----------------------------------------------------------------------------------
 
   test("left outer join") {
-    val originalQuery = x.join(y, LeftOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.join(y, LeftOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), LeftOuter, condition).limit(1).analyze
+      } else {
+        LocalLimit(1, x).join(y, LeftOuter, condition).limit(1).analyze
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("left outer join and left sides are limited") {
-    val originalQuery = x.limit(2).join(y, LeftOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.limit(2).join(y, LeftOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), LeftOuter, condition).limit(1).analyze
+      } else {
+        LocalLimit(1, x).join(y, LeftOuter, condition).limit(1).analyze
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("left outer join and right sides are limited") {
-    val originalQuery = x.join(y.limit(2), LeftOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), LeftOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.join(y.limit(2), LeftOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), LeftOuter, condition).limit(1).analyze
+      } else {
+        LocalLimit(1, x).join(Limit(2, y), LeftOuter, condition).limit(1).analyze
+      }
+      comparePlans( Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("right outer join") {
-    val originalQuery = x.join(y, RightOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.join(y, RightOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
+      } else {
+        x.join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("right outer join and right sides are limited") {
-    val originalQuery = x.join(y.limit(2), RightOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.join(y.limit(2), RightOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
+      } else {
+        x.join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("right outer join and left sides are limited") {
-    val originalQuery = x.limit(2).join(y, RightOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, Limit(2, x).join(LocalLimit(1, y), RightOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.limit(2).join(y, RightOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
+      } else {
+        Limit(2, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("larger limits are not pushed on top of smaller ones in right outer join") {
@@ -146,35 +176,59 @@ class LimitPushdownSuite extends PlanTest {
 
   test("full outer join where neither side is limited and both sides have same statistics") {
     assert(x.stats.sizeInBytes === y.stats.sizeInBytes)
-    val originalQuery = x.join(y, FullOuter).limit(1).analyze
-    val optimized = Optimize.execute(originalQuery)
-    // No pushdown for FULL OUTER JOINS.
-    comparePlans(optimized, originalQuery)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.join(y, FullOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), FullOuter, condition).limit(1).analyze
+      } else {
+        // No pushdown for FULL OUTER JOINS.
+        originalQuery
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("full outer join where neither side is limited and left side has larger statistics") {
     val xBig = testRelation.copy(data = Seq.fill(10)(null)).subquery("x")
     assert(xBig.stats.sizeInBytes > y.stats.sizeInBytes)
-    val originalQuery = xBig.join(y, FullOuter).limit(1).analyze
-    val optimized = Optimize.execute(originalQuery)
-    // No pushdown for FULL OUTER JOINS.
-    comparePlans(optimized, originalQuery)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = xBig.join(y, FullOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, xBig).join(LocalLimit(1, y), FullOuter, condition).limit(1).analyze
+      } else {
+        // No pushdown for FULL OUTER JOINS.
+        originalQuery
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("full outer join where neither side is limited and right side has larger statistics") {
     val yBig = testRelation.copy(data = Seq.fill(10)(null)).subquery("y")
     assert(x.stats.sizeInBytes < yBig.stats.sizeInBytes)
-    val originalQuery = x.join(yBig, FullOuter).limit(1).analyze
-    val optimized = Optimize.execute(originalQuery)
-    // No pushdown for FULL OUTER JOINS.
-    comparePlans(optimized, originalQuery)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.join(yBig, FullOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, yBig), FullOuter, condition).limit(1).analyze
+      } else {
+        // No pushdown for FULL OUTER JOINS.
+        originalQuery
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("full outer join where both sides are limited") {
-    val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1).analyze
-    val optimized = Optimize.execute(originalQuery)
-    // No pushdown for FULL OUTER JOINS.
-    comparePlans(optimized, originalQuery)
+    Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
+      val originalQuery = x.limit(2).join(y.limit(2), FullOuter, condition).limit(1).analyze
+      val optimized = if (condition.isEmpty) {
+        LocalLimit(1, x).join(LocalLimit(1, y), FullOuter, condition).limit(1).analyze
+      } else {
+        // No pushdown for FULL OUTER JOINS.
+        originalQuery
+      }
+      comparePlans(Optimize.execute(originalQuery), optimized)
+    }
   }
 
   test("SPARK-33433: Change Aggregate max rows to 1 if grouping is empty") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org