You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/16 03:43:36 UTC

[GitHub] dilipbiswal commented on a change in pull request #23750: [SPARK-19712][SQL] Pushing Left Semi and Left Anti joins through Project, Aggregate, Window, Union etc.

dilipbiswal commented on a change in pull request #23750: [SPARK-19712][SQL] Pushing Left Semi and Left Anti joins through Project, Aggregate, Window, Union etc.
URL: https://github.com/apache/spark/pull/23750#discussion_r257444587
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ##########
 @@ -1188,6 +1189,190 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // Similar to the above Filter over Project
+    // LeftSemi/LeftAnti over Project
+    case join @ Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
+      if pList.forall(_.deterministic) && !ScalarSubquery.hasScalarSubquery(pList) &&
+        canPushThroughCondition(Seq(gChild), joinCond, rightOp) =>
+      if (joinCond.isEmpty) {
+        // No join condition, just push down the Join below Project
+        Project(pList, Join(gChild, rightOp, joinType, joinCond, hint))
+      } else {
+        // Create a map of Aliases to their values from the child projection.
+        // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
+        val aliasMap = AttributeMap(pList.collect {
+          case a: Alias => (a.toAttribute, a.child)
+        })
+        val newJoinCond = if (aliasMap.nonEmpty) {
+          Option(replaceAlias(joinCond.get, aliasMap))
+        } else {
+          joinCond
+        }
+        Project(pList, Join(gChild, rightOp, joinType, newJoinCond, hint))
+      }
+
+    // Similar to the above Filter over Aggregate
+    // LeftSemi/LeftAnti over Aggregate
+    case join @ Join(aggregate: Aggregate, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
+      if aggregate.aggregateExpressions.forall(_.deterministic)
+        && aggregate.groupingExpressions.nonEmpty =>
+      if (joinCond.isEmpty) {
+        // No join condition, just push down Join below Aggregate
+        aggregate.copy(child = Join(aggregate.child, rightOp, joinType, joinCond, hint))
+      } else {
+        // Find all the aliased expressions in the aggregate list that don't include any actual
+        // AggregateExpression, and create a map from the alias to the expression
+        val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
+          case a: Alias if a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
+            (a.toAttribute, a.child)
+        })
+
+        // For each join condition, expand the alias and
+        // check if the condition can be evaluated using
+        // attributes produced by the aggregate operator's child operator.
+
+        val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
+          val replaced = replaceAlias(cond, aliasMap)
+          cond.references.nonEmpty &&
 
 Review comment:
    @maropu Thanks for reviewing. I have addressed your comments. Please look through it when you get a chance. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org