You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/24 03:14:52 UTC
[spark] branch master updated: [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc732a262e [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty
0cc732a262e is described below
commit 0cc732a262ee3fe504bdad57b077d23a1a5d2287
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Jun 24 11:14:28 2022 +0800
[SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty
### What changes were proposed in this pull request?
This PR enhances https://github.com/apache/spark/pull/35216 to support more cases.
Before this PR it only supports left semi/anti join followed by limit:
```sql
SELECT * FROM t1 LEFT SEMI JOIN t2 LIMIT 10;
SELECT * FROM t1 LEFT ANTI JOIN t2 LIMIT 10;
```
After this PR it do not have this limitation and also support in / not in subquery:
```sql
SELECT * FROM t1 LEFT SEMI JOIN t2;
SELECT * FROM t1 LEFT ANTI JOIN t2;
SELECT * FROM v1 WHERE literal IN (SELECT id FROM t2);
SELECT * FROM v1 WHERE literal NOT IN (SELECT id FROM t2);
```
### Why are the changes needed?
Improve query performance. For example:
```sql
CREATE TABLE t1(id int) using parquet;
CREATE TABLE t2(id int, type string) using parquet;
CREATE TEMP VIEW v1 AS SELECT id, 't' AS type FROM t1;
EXPLAIN EXTENDED SELECT * FROM v1 WHERE type IN (SELECT type FROM t2);
```
Before this PR:
```
=== Result of Batch RewriteSubquery ===
Project [id#241, t AS type#246] Project [id#241, t AS type#246]
!+- Filter t IN (list#243 []) +- Join LeftSemi, (t = type#248)
! : +- Project [type#248] :- Relation default.t1[id#241] parquet
! : +- Relation default.t2[id#247,type#248] parquet +- Project [type#248]
! +- Relation default.t1[id#241] parquet +- Relation default.t2[id#247,type#248] parquet
```
After this PR:
```
=== Result of Batch RewriteSubquery ===
Project [id#241, t AS type#246] Project [id#241, t AS type#246]
!+- Filter t IN (list#243 []) +- Join LeftSemi
! : +- Project [type#248] :- Relation default.t1[id#241] parquet
! : +- Relation default.t2[id#247,type#248] parquet +- GlobalLimit 1
! +- Relation default.t1[id#241] parquet +- LocalLimit 1
! +- Project
! +- Filter (t = type#248)
! +- Relation default.t2[id#247,type#248] parquet
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #36909 from wangyum/SPARK-39511.
Lead-authored-by: Yuming Wang <yu...@ebay.com>
Co-authored-by: Yuming Wang <wg...@gmail.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 11 ++++++----
.../catalyst/optimizer/LimitPushdownSuite.scala | 24 +++++++++++++++++++++-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 18 +++++++++++++++-
3 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4304f475655..eda42a9adbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -235,6 +235,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
+ PushPredicateThroughJoin,
+ LimitPushDown,
ColumnPruning,
CollapseProject,
RemoveRedundantAliases,
@@ -710,15 +712,13 @@ object LimitPushDown extends Rule[LogicalPlan] {
left = maybePushLocalLimit(limitExpr, join.left),
right = maybePushLocalLimit(limitExpr, join.right))
case LeftSemi | LeftAnti if join.condition.isEmpty =>
- join.copy(
- left = maybePushLocalLimit(limitExpr, join.left),
- right = maybePushLocalLimit(Literal(1, IntegerType), join.right))
+ join.copy(left = maybePushLocalLimit(limitExpr, join.left))
case _ => join
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
- _.containsPattern(LIMIT), ruleId) {
+ _.containsAnyPattern(LIMIT, LEFT_SEMI_OR_ANTI_JOIN), ruleId) {
// Adding extra Limits below UNION ALL for children which are not Limit or do not have Limit
// descendants whose maxRow is larger. This heuristic is valid assuming there does not exist any
// Limit push-down rule that is unable to infer the value of maxRows.
@@ -753,6 +753,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
// Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset.
case LocalLimit(le, Offset(oe, grandChild)) =>
Offset(oe, LocalLimit(Add(le, oe), grandChild))
+ // Push down local limit 1 if join type is LeftSemiOrAnti and join condition is empty.
+ case j @ Join(_, right, LeftSemiOrAnti(_), None, _) if !right.maxRows.exists(_ <= 1) =>
+ j.copy(right = maybePushLocalLimit(Literal(1, IntegerType), right))
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 1f19ac77c94..9c093bda263 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Add
-import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
+import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -276,4 +276,26 @@ class LimitPushdownSuite extends PlanTest {
Optimize.execute(testRelation.offset(2).limit(1).analyze),
GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze)
}
+
+ test("SPARK-39511: Push limit 1 to right side if join type is LeftSemiOrAnti") {
+ Seq(LeftSemi, LeftAnti).foreach { joinType =>
+ comparePlans(
+ Optimize.execute(x.join(y, joinType).analyze),
+ x.join(LocalLimit(1, y), joinType).analyze)
+ }
+
+ Seq(LeftSemi, LeftAnti).foreach { joinType =>
+ comparePlans(
+ Optimize.execute(x.join(y.limit(2), joinType).analyze),
+ x.join(LocalLimit(1, y), joinType).analyze)
+ }
+
+ Seq(LeftSemi, LeftAnti).foreach { joinType =>
+ val originalQuery1 = x.join(LocalLimit(1, y), joinType).analyze
+ val originalQuery2 = x.join(y.limit(1), joinType).analyze
+
+ comparePlans(Optimize.execute(originalQuery1), originalQuery1)
+ comparePlans(Optimize.execute(originalQuery2), originalQuery2)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index b1057fd14bc..fa24e8d175b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
-import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -2204,4 +2204,20 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
|""".stripMargin),
Row("2022-06-01"))
}
+
+ test("SPARK-39511: Push limit 1 to right side if join type is Left Semi/Anti") {
+ withTable("t1", "t2") {
+ withTempView("v1") {
+ spark.sql("CREATE TABLE t1(id int) using parquet")
+ spark.sql("CREATE TABLE t2(id int, type string) using parquet")
+ spark.sql("CREATE TEMP VIEW v1 AS SELECT id, 't' AS type FROM t1")
+ val df = spark.sql("SELECT * FROM v1 WHERE type IN (SELECT type FROM t2)")
+ val join =
+ df.queryExecution.sparkPlan.collectFirst { case b: BroadcastNestedLoopJoinExec => b }
+ assert(join.nonEmpty)
+ assert(join.head.right.isInstanceOf[LocalLimitExec])
+ assert(join.head.right.asInstanceOf[LocalLimitExec].limit === 1)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org