You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/03 01:58:10 UTC

[spark] branch master updated: [SPARK-19712][SQL][FOLLOW-UP] Don't do partial pushdown when pushing down LeftAnti joins below Aggregate or Window operators.

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8b5acd  [SPARK-19712][SQL][FOLLOW-UP] Don't do partial pushdown when pushing down LeftAnti joins below Aggregate or Window operators.
b8b5acd is described below

commit b8b5acdd417f28c60c784159253a8974fa738904
Author: Dilip Biswal <db...@us.ibm.com>
AuthorDate: Wed Apr 3 09:56:27 2019 +0800

    [SPARK-19712][SQL][FOLLOW-UP] Don't do partial pushdown when pushing down LeftAnti joins below Aggregate or Window operators.
    
    ## What changes were proposed in this pull request?
    After [23750](https://github.com/apache/spark/pull/23750), we may pushdown left anti joins below aggregate and window operators with a partial join condition. This is not correct and was pointed out by hvanhovell and cloud-fan [here](https://github.com/apache/spark/pull/23750#discussion_r270017097). This pr addresses their comments.
    ## How was this patch tested?
    Added two new tests to verify the behaviour.
    
    Closes #24253 from dilipbiswal/SPARK-19712-followup.
    
    Authored-by: Dilip Biswal <db...@us.ibm.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../optimizer/PushDownLeftSemiAntiJoin.scala       | 35 +++++++++++++++--
 .../optimizer/LeftSemiAntiJoinPushDownSuite.scala  | 44 ++++++++++++++++++++--
 2 files changed, 73 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
index bc868df..afe2cfa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
@@ -82,7 +82,18 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
           val newAgg = agg.copy(child = Join(agg.child, rightOp, joinType, Option(replaced), hint))
           // If there is no more filter to stay up, just return the Aggregate over Join.
           // Otherwise, create "Filter(stayUp) <- Aggregate <- Join(pushDownPredicate)".
-          if (stayUp.isEmpty) newAgg else Filter(stayUp.reduce(And), newAgg)
+          if (stayUp.isEmpty) {
+            newAgg
+          } else {
+            joinType match {
+              // In case of Left semi join, the part of the join condition which does not refer to
+              // to child attributes of the aggregate operator are kept as a Filter over window.
+              case LeftSemi => Filter(stayUp.reduce(And), newAgg)
+              // In case of left anti join, the join is pushed down when the entire join condition
+              // is eligible to be pushed down to preserve the semantics of left anti join.
+              case _ => join
+            }
+          }
         } else {
           // The join condition is not a subset of the Aggregate's GROUP BY columns,
           // no push down.
@@ -114,7 +125,18 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
         if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
           val predicate = pushDown.reduce(And)
           val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint))
-          if (stayUp.isEmpty) newPlan else Filter(stayUp.reduce(And), newPlan)
+          if (stayUp.isEmpty) {
+            newPlan
+          } else {
+            joinType match {
+              // In case of Left semi join, the part of the join condition which does not refer to
+              // to partition attributes of the window operator are kept as a Filter over window.
+              case LeftSemi => Filter(stayUp.reduce(And), newPlan)
+              // In case of left anti join, the join is pushed down when the entire join condition
+              // is eligible to be pushed down to preserve the semantics of left anti join.
+              case _ => join
+            }
+          }
         } else {
           // The join condition is not a subset of the Window's PARTITION BY clause,
           // no push down.
@@ -184,7 +206,14 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
       if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
         val newChild = insertJoin(Option(pushDown.reduceLeft(And)))
         if (stayUp.nonEmpty) {
-          Filter(stayUp.reduceLeft(And), newChild)
+          join.joinType match {
+            // In case of Left semi join, the part of the join condition which does not refer to
+            // to attributes of the grandchild are kept as a Filter over window.
+            case LeftSemi => Filter(stayUp.reduce(And), newChild)
+            // In case of left anti join, the join is pushed down when the entire join condition
+            // is eligible to be pushed down to preserve the semantics of left anti join.
+            case _ => join
+          }
         } else {
           newChild
         }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
index 1a0231e..185568d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
@@ -117,7 +117,7 @@ class LeftSemiPushdownSuite extends PlanTest {
     comparePlans(optimized, originalQuery.analyze)
   }
 
-  test("Aggregate: LeftSemiAnti join partial pushdown") {
+  test("Aggregate: LeftSemi join partial pushdown") {
     val originalQuery = testRelation
       .groupBy('b)('b, sum('c).as('sum))
       .join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd && 'sum === 10))
@@ -132,6 +132,15 @@ class LeftSemiPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("Aggregate: LeftAnti join no pushdown") {
+    val originalQuery = testRelation
+      .groupBy('b)('b, sum('c).as('sum))
+      .join(testRelation1, joinType = LeftAnti, condition = Some('b === 'd && 'sum === 10))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    comparePlans(optimized, originalQuery.analyze)
+  }
+
   test("LeftSemiAnti join over aggregate - no pushdown") {
     val originalQuery = testRelation
       .groupBy('b)('b, sum('c).as('sum))
@@ -174,7 +183,7 @@ class LeftSemiPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("Window: LeftSemiAnti partial pushdown") {
+  test("Window: LeftSemi partial pushdown") {
     // Attributes from join condition which does not refer to the window partition spec
     // are kept up in the plan as a Filter operator above Window.
     val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
@@ -195,6 +204,25 @@ class LeftSemiPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("Window: LeftAnti no pushdown") {
+    // Attributes from join condition which does not refer to the window partition spec
+    // are kept up in the plan as a Filter operator above Window.
+    val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+    val originalQuery = testRelation
+      .select('a, 'b, 'c, winExpr.as('window))
+      .join(testRelation1, joinType = LeftAnti, condition = Some('a === 'd && 'b > 5))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b, 'c)
+      .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
+      .join(testRelation1, joinType = LeftAnti, condition = Some('a === 'd && 'b > 5))
+      .select('a, 'b, 'c, 'window).analyze
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("Union: LeftSemiAnti join pushdown") {
     val testRelation2 = LocalRelation('x.int, 'y.int, 'z.int)
 
@@ -251,7 +279,7 @@ class LeftSemiPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("Unary: LeftSemiAnti join pushdown - partial pushdown") {
+  test("Unary: LeftSemi join pushdown - partial pushdown") {
     val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
     val originalQuery = testRelationWithArrayType
       .generate(Explode('c_arr), alias = Some("arr"), outputNames = Seq("out_col"))
@@ -267,6 +295,16 @@ class LeftSemiPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("Unary: LeftAnti join pushdown - no pushdown") {
+    val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
+    val originalQuery = testRelationWithArrayType
+      .generate(Explode('c_arr), alias = Some("arr"), outputNames = Seq("out_col"))
+      .join(testRelation1, joinType = LeftAnti, condition = Some('b === 'd && 'b === 'out_col))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    comparePlans(optimized, originalQuery.analyze)
+  }
+
   test("Unary: LeftSemiAnti join pushdown - no pushdown") {
     val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
     val originalQuery = testRelationWithArrayType


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org