You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/19 05:24:07 UTC
[spark] branch master updated: [SPARK-37917][SQL] Push down limit 1 for right side of left semi/anti join if join condition is empty
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 61abae36 [SPARK-37917][SQL] Push down limit 1 for right side of left semi/anti join if join condition is empty
61abae36 is described below
commit 61abae36eaccfa0ccb6bd2916e28164a96207e34
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Wed Jan 19 13:23:15 2022 +0800
[SPARK-37917][SQL] Push down limit 1 for right side of left semi/anti join if join condition is empty
### What changes were proposed in this pull request?
It is safe to push down the limit 1 for the right side of left semi/anti join if the join condition is empty, since we only care if the right side is empty. For example:
```scala
val numRows = 1024 * 1024 * 40
spark.sql(s"CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b, id AS c FROM range(1, ${numRows}L, 1, 5)")
spark.sql(s"CREATE TABLE t2 using parquet AS SELECT id AS a, id AS b, id AS c FROM range(1, ${numRows}L, 1, 5)")
spark.sql("SELECT * FROM t1 LEFT SEMI JOIN t2 LIMIT 5").explain(true)
```
Before this pr:
```
== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
+- Join LeftSemi
:- LocalLimit 5
: +- Relation default.t1[a#8L,b#9L,c#10L] parquet
+- Project
+- Relation default.t2[a#11L,b#12L,c#13L] parquet
```
After this pr:
```
== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
+- Join LeftSemi
:- LocalLimit 5
: +- Relation default.t1[a#8L,b#9L,c#10L] parquet
+- LocalLimit 1
+- Project
+- Relation default.t2[a#11L,b#12L,c#13L] parquet
```
### Why are the changes needed?
Improve query performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #35216 from wangyum/SPARK-37917.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 +++-
.../org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala | 4 ++--
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1c2f0af..357d11c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -684,7 +684,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
left = maybePushLocalLimit(limitExpr, join.left),
right = maybePushLocalLimit(limitExpr, join.right))
case LeftSemi | LeftAnti if join.condition.isEmpty =>
- join.copy(left = maybePushLocalLimit(limitExpr, join.left))
+ join.copy(
+ left = maybePushLocalLimit(limitExpr, join.left),
+ right = maybePushLocalLimit(Literal(1, IntegerType), join.right))
case _ => join
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 848416b..ee7f872 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -216,9 +216,9 @@ class LimitPushdownSuite extends PlanTest {
test("SPARK-34514: Push down limit through LEFT SEMI and LEFT ANTI join") {
// Push down when condition is empty
Seq(LeftSemi, LeftAnti).foreach { joinType =>
- val originalQuery = x.join(y, joinType).limit(1)
+ val originalQuery = x.join(y, joinType).limit(5)
val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer = Limit(1, LocalLimit(1, x).join(y, joinType)).analyze
+ val correctAnswer = Limit(5, LocalLimit(5, x).join(LocalLimit(1, y), joinType)).analyze
comparePlans(optimized, correctAnswer)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org