You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/02/26 11:46:53 UTC

[spark] branch master updated: [SPARK-34533][SQL] Eliminate LEFT ANTI join to empty relation in AQE

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d5021f  [SPARK-34533][SQL] Eliminate LEFT ANTI join to empty relation in AQE
7d5021f is described below

commit 7d5021f5eed2b9c48bd02b92cce1535edc46d0e4
Author: Cheng Su <ch...@fb.com>
AuthorDate: Fri Feb 26 11:46:27 2021 +0000

    [SPARK-34533][SQL] Eliminate LEFT ANTI join to empty relation in AQE
    
    ### What changes were proposed in this pull request?
    
    I discovered from review discussion - https://github.com/apache/spark/pull/31630#discussion_r581774000 , that we can eliminate LEFT ANTI join (with no join condition) to empty relation, if the right side is known to be non-empty. So with AQE, this is doable similar to https://github.com/apache/spark/pull/29484 .
    
    ### Why are the changes needed?
    
    This can help eliminate the join operator during logical plan optimization.
    Before this PR, [left side physical plan `execute()` will be called](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L192), so if left side is complicated (e.g. contain broadcast exchange operator), then some computation would happen. However after this PR, the join operator will be removed during logical plan, and nothing will be computed from left side. Potentially it can save resource for the [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added unit tests for positive and negative queries in `AdaptiveQueryExecSuite.scala`.
    
    Closes #31641 from c21/left-anti-aqe.
    
    Authored-by: Cheng Su <ch...@fb.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../adaptive/EliminateJoinToEmptyRelation.scala    | 16 +++++++++++++++-
 .../adaptive/AdaptiveQueryExecSuite.scala          | 22 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
index cfdd20e..d6df522 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.adaptive
 
 import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi}
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys}
@@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation
  *    This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join),
  *    because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE
  *    at the first place.
+ *
+ * 3. Join is left anti join without condition, and join right side is non-empty.
  */
 object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {
 
@@ -53,5 +55,17 @@ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {
 
     case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) =>
       LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
+
+    case j @ Join(_, _, LeftAnti, None, _) =>
+      val isNonEmptyRightSide = j.right match {
+        case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined =>
+          stage.getRuntimeStatistics.rowCount.exists(_ > 0)
+        case _ => false
+      }
+      if (isNonEmptyRightSide) {
+        LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
+      } else {
+        j
+      }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 122bc2d..d7a1d5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1230,6 +1230,28 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-34533: Eliminate left anti join to empty relation") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      withTable("emptyTestData") {
+        spark.range(0).write.saveAsTable("emptyTestData")
+        Seq(
+          // broadcast non-empty right side
+          ("SELECT /*+ broadcast(testData3) */ * FROM testData LEFT ANTI JOIN testData3", true),
+          // broadcast empty right side
+          ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData",
+            false),
+          // broadcast left side
+          ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false)
+        ).foreach { case (query, isEliminated) =>
+          val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
+          assert(findTopLevelBaseJoin(plan).size == 1)
+          assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated)
+        }
+      }
+    }
+  }
+
   test("SPARK-32753: Only copy tags to node with no tags") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       withTempView("v1") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org