You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/24 03:14:52 UTC

[spark] branch master updated: [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc732a262e [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty
0cc732a262e is described below

commit 0cc732a262ee3fe504bdad57b077d23a1a5d2287
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Jun 24 11:14:28 2022 +0800

    [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty
    
    ### What changes were proposed in this pull request?
    
    This PR enhances https://github.com/apache/spark/pull/35216 to support more cases.
    
    Before this PR it only supports left semi/anti join followed by limit:
    ```sql
    SELECT * FROM t1 LEFT SEMI JOIN t2 LIMIT 10;
    SELECT * FROM t1 LEFT ANTI JOIN t2 LIMIT 10;
    ```
    After this PR it do not have this limitation and also support in / not in subquery:
    ```sql
    SELECT * FROM t1 LEFT SEMI JOIN t2;
    SELECT * FROM t1 LEFT ANTI JOIN t2;
    
    SELECT * FROM v1 WHERE literal IN (SELECT id FROM t2);
    SELECT * FROM v1 WHERE literal NOT IN (SELECT id FROM t2);
    ```
    
    ### Why are the changes needed?
    
    Improve query performance. For example:
    ```sql
    CREATE TABLE t1(id int) using parquet;
    CREATE TABLE t2(id int, type string) using parquet;
    CREATE TEMP VIEW v1 AS SELECT id, 't' AS type FROM t1;
    
    EXPLAIN EXTENDED SELECT * FROM v1 WHERE type IN (SELECT type FROM t2);
    ```
    
    Before this PR:
    ```
    === Result of Batch RewriteSubquery ===
     Project [id#241, t AS type#246]                            Project [id#241, t AS type#246]
    !+- Filter t IN (list#243 [])                               +- Join LeftSemi, (t = type#248)
    !   :  +- Project [type#248]                                   :- Relation default.t1[id#241] parquet
    !   :     +- Relation default.t2[id#247,type#248] parquet      +- Project [type#248]
    !   +- Relation default.t1[id#241] parquet                        +- Relation default.t2[id#247,type#248] parquet
    ```
    
    After this PR:
    ```
    === Result of Batch RewriteSubquery ===
     Project [id#241, t AS type#246]                            Project [id#241, t AS type#246]
    !+- Filter t IN (list#243 [])                               +- Join LeftSemi
    !   :  +- Project [type#248]                                   :- Relation default.t1[id#241] parquet
    !   :     +- Relation default.t2[id#247,type#248] parquet      +- GlobalLimit 1
    !   +- Relation default.t1[id#241] parquet                        +- LocalLimit 1
    !                                                                    +- Project
    !                                                                       +- Filter (t = type#248)
    !                                                                          +- Relation default.t2[id#247,type#248] parquet
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #36909 from wangyum/SPARK-39511.
    
    Lead-authored-by: Yuming Wang <yu...@ebay.com>
    Co-authored-by: Yuming Wang <wg...@gmail.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 11 ++++++----
 .../catalyst/optimizer/LimitPushdownSuite.scala    | 24 +++++++++++++++++++++-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 18 +++++++++++++++-
 3 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4304f475655..eda42a9adbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -235,6 +235,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       CheckCartesianProducts) :+
     Batch("RewriteSubquery", Once,
       RewritePredicateSubquery,
+      PushPredicateThroughJoin,
+      LimitPushDown,
       ColumnPruning,
       CollapseProject,
       RemoveRedundantAliases,
@@ -710,15 +712,13 @@ object LimitPushDown extends Rule[LogicalPlan] {
           left = maybePushLocalLimit(limitExpr, join.left),
           right = maybePushLocalLimit(limitExpr, join.right))
       case LeftSemi | LeftAnti if join.condition.isEmpty =>
-        join.copy(
-          left = maybePushLocalLimit(limitExpr, join.left),
-          right = maybePushLocalLimit(Literal(1, IntegerType), join.right))
+        join.copy(left = maybePushLocalLimit(limitExpr, join.left))
       case _ => join
     }
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
-    _.containsPattern(LIMIT), ruleId) {
+    _.containsAnyPattern(LIMIT, LEFT_SEMI_OR_ANTI_JOIN), ruleId) {
     // Adding extra Limits below UNION ALL for children which are not Limit or do not have Limit
     // descendants whose maxRow is larger. This heuristic is valid assuming there does not exist any
     // Limit push-down rule that is unable to infer the value of maxRows.
@@ -753,6 +753,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
     // Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset.
     case LocalLimit(le, Offset(oe, grandChild)) =>
       Offset(oe, LocalLimit(Add(le, oe), grandChild))
+    // Push down local limit 1 if join type is LeftSemiOrAnti and join condition is empty.
+    case j @ Join(_, right, LeftSemiOrAnti(_), None, _) if !right.maxRows.exists(_ <= 1) =>
+      j.copy(right = maybePushLocalLimit(Literal(1, IntegerType), right))
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 1f19ac77c94..9c093bda263 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.Add
-import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 
@@ -276,4 +276,26 @@ class LimitPushdownSuite extends PlanTest {
       Optimize.execute(testRelation.offset(2).limit(1).analyze),
       GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze)
   }
+
+  test("SPARK-39511: Push limit 1 to right side if join type is LeftSemiOrAnti") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      comparePlans(
+        Optimize.execute(x.join(y, joinType).analyze),
+        x.join(LocalLimit(1, y), joinType).analyze)
+    }
+
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      comparePlans(
+        Optimize.execute(x.join(y.limit(2), joinType).analyze),
+        x.join(LocalLimit(1, y), joinType).analyze)
+    }
+
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery1 = x.join(LocalLimit(1, y), joinType).analyze
+      val originalQuery2 = x.join(y.limit(1), joinType).analyze
+
+      comparePlans(Optimize.execute(originalQuery1), originalQuery1)
+      comparePlans(Optimize.execute(originalQuery2), originalQuery2)
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index b1057fd14bc..fa24e8d175b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
-import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -2204,4 +2204,20 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
             |""".stripMargin),
       Row("2022-06-01"))
   }
+
+  test("SPARK-39511: Push limit 1 to right side if join type is Left Semi/Anti") {
+    withTable("t1", "t2") {
+      withTempView("v1") {
+        spark.sql("CREATE TABLE t1(id int) using parquet")
+        spark.sql("CREATE TABLE t2(id int, type string) using parquet")
+        spark.sql("CREATE TEMP VIEW v1 AS SELECT id, 't' AS type FROM t1")
+        val df = spark.sql("SELECT * FROM v1 WHERE type IN (SELECT type FROM t2)")
+        val join =
+          df.queryExecution.sparkPlan.collectFirst { case b: BroadcastNestedLoopJoinExec => b }
+        assert(join.nonEmpty)
+        assert(join.head.right.isInstanceOf[LocalLimitExec])
+        assert(join.head.right.asInstanceOf[LocalLimitExec].limit === 1)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org