You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sameeragarwal <gi...@git.apache.org> on 2016/02/10 19:30:58 UTC

[GitHub] spark pull request: [SPARK-12594] [SQL] Outer Join Elimination by ...

Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10567#discussion_r52501622
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -769,6 +770,79 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
     }
     
     /**
    + * Elimination of outer joins, if the predicates can restrict the result sets so that
    + * all null-supplying rows are eliminated
    + *
    + * - full outer -> inner if both sides have such predicates
    + * - left outer -> inner if the right side has such predicates
    + * - right outer -> inner if the left side has such predicates
    + * - full outer -> left outer if only the left side has such predicates
    + * - full outer -> right outer if only the right side has such predicates
    + *
    + * This rule should be executed before pushing down the Filter
    + */
    +object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  private def containsAttr(plan: LogicalPlan, attr: Attribute): Boolean =
    +    plan.outputSet.exists(_.semanticEquals(attr))
    +
    +  private def hasNullFilteringPredicate(predicate: Expression, plan: LogicalPlan): Boolean = {
    --- End diff --
    
    @gatorsmile now that we propagate IsNotNull constraints in the logical plan, you should be able to eliminate outer joins by simply looking into the constraints of the parent filter operator. I believe something along the lines this should work:
    
    ```scala
    object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
    
      private def buildNewJoin(filter: Filter, join: Join): Join = {
    
        val leftHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull])
          .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
        val rightHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull])
          .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
    
        join.joinType match {
          case RightOuter if leftHasNonNullPredicate =>
            Join(join.left, join.right, Inner, join.condition)
          case LeftOuter if rightHasNonNullPredicate =>
            Join(join.left, join.right, Inner, join.condition)
          case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate =>
            Join(join.left, join.right, Inner, join.condition)
          case FullOuter if leftHasNonNullPredicate =>
            Join(join.left, join.right, LeftOuter, join.condition)
          case FullOuter if rightHasNonNullPredicate =>
            Join(join.left, join.right, RightOuter, join.condition)
          case _ =>
            join
        }
      }
    
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        case f @ Filter(condition,  j@ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
          Filter(condition, buildNewJoin(f, j))
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org