You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/10 07:21:11 UTC

[GitHub] [spark] dilipbiswal commented on a change in pull request #24331: [SPARK-19712][SQL] Pushdown LeftSemi/LeftAnti below join

dilipbiswal commented on a change in pull request #24331: [SPARK-19712][SQL] Pushdown LeftSemi/LeftAnti below join
URL: https://github.com/apache/spark/pull/24331#discussion_r273820079
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
 ##########
 @@ -159,3 +159,106 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 }
+
+/**
+ * This rule is a variant of [[PushPredicateThroughJoin]] which can handle
+ * pushing down Left semi and Left Anti joins below a join operator. The
+ * allowable join types are:
+ *  1) Inner
+ *  2) Cross
+ *  3) LeftOuter
+ *  4) RightOuter
+ */
+object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Define an enumeration to identify whether a LeftSemi/LeftAnti join can be pushed down to
+   * the left leg or the right leg of the join.
+   */
+  object pushdownDirection extends Enumeration {
+    val toRightBranch, toLeftBranch, none = Value
+  }
+
+  /**
+   * LeftSemi/LeftAnti joins are pushed down when its left child is a join operator
+   * with a join type that is in the AllowedJoinTypes.
+   */
+  object AllowedJoinTypes {
+    def unapply(joinType: JoinType): Option[JoinType] = joinType match {
+      case Inner | Cross | LeftOuter | RightOuter => Some(joinType)
+      case _ => None
+    }
+  }
+
+  /**
+   * Determine which side of the join a LeftSemi/LeftAnti join can be pushed to.
+   */
+  private def pushTo(leftChild: Join, rightChild: LogicalPlan, joinCond: Option[Expression]) = {
+    val left = leftChild.left
+    val right = leftChild.right
+    val joinType = leftChild.joinType
+    val rightOutput = rightChild.outputSet
+
+    if (joinCond.nonEmpty) {
+      val noPushdown = (pushdownDirection.none, None)
+      val conditions = splitConjunctivePredicates(joinCond.get)
+      val (leftConditions, rest) =
+        conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput))
+      val (rightConditions, commonConditions) =
+        rest.partition(_.references.subsetOf(right.outputSet ++ rightOutput))
+
+      if (rest.isEmpty && leftConditions.nonEmpty) {
+        // When the join conditions can be computed based on the left leg of
+        // leftsemi/anti join then push the leftsemi/anti join to the left side.
+        (pushdownDirection.toLeftBranch, leftConditions.reduceLeftOption(And))
+      } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) {
+        // When the join conditions can be computed based on the attributes from right leg of
+        // leftsemi/anti join then push the leftsemi/anti join to the right side.
+        (pushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And))
+      } else {
+        noPushdown
+      }
+    } else {
+      /**
+       * When the join condition is empty,
+       * 1) if this is a left outer join or inner join, push leftsemi/anti join down
+       *    to the left leg of join.
+       * 2) if a right outer join, to the right leg of join,
+       */
+      val action = joinType match {
+        case RightOuter =>
+          pushdownDirection.toRightBranch
+        case _: InnerLike | LeftOuter =>
+          pushdownDirection.toLeftBranch
+        case _ =>
+          pushdownDirection.none
+      }
+      (action, None)
+    }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // push LeftSemi/LeftAnti down into the join below
+    case j @ Join(left @ Join(gLeft, gRight, AllowedJoinTypes(_), belowJoinCond, childHint),
+    right, LeftSemiOrAnti(joinType), joinCond, parentHint) =>
+      val belowJoinType = left.joinType
+      val (action, newJoinCond) = pushTo(left, right, joinCond)
+
+      action match {
+        case pushdownDirection.toLeftBranch
+          if (belowJoinType == LeftOuter || belowJoinType.isInstanceOf[InnerLike]) =>
+          // push down leftsemi/anti join to the left table
+          val newLeft = Join(gLeft, right, joinType, newJoinCond, parentHint)
 
 Review comment:
   @cloud-fan I have a question here. Will it be safe to propagate the hints here ? I am inclined to only do this optimization if no join hints are specified in either parent and child joins. Currently i am propagating them as is but thinking of changing it. Wanted to check your opinion before i made the change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org