You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gengliangwang <gi...@git.apache.org> on 2018/04/23 07:21:10 UTC

[GitHub] spark pull request #21083: [SPARK-23564][SQL] infer additional filters from ...

Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21083#discussion_r183295844
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -664,53 +662,52 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
           }
     
         case join @ Join(left, right, joinType, conditionOpt) =>
    -      // Only consider constraints that can be pushed down completely to either the left or the
    -      // right child
    -      val constraints = join.allConstraints.filter { c =>
    -        c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
    -      }
    -      // Remove those constraints that are already enforced by either the left or the right child
    -      val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
    -      val newConditionOpt = conditionOpt match {
    -        case Some(condition) =>
    -          val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
    -          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
    -        case None =>
    -          additionalConstraints.reduceOption(And)
    -      }
    -      // Infer filter for left/right outer joins
    -      val newLeftOpt = joinType match {
    -        case RightOuter if newConditionOpt.isDefined =>
    -          val inferredConstraints = left.getRelevantConstraints(
    -            left.constraints
    -              .union(right.constraints)
    -              .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
    -          val newFilters = inferredConstraints
    -            .filterNot(left.constraints.contains)
    -            .reduceLeftOption(And)
    -          newFilters.map(Filter(_, left))
    -        case _ => None
    -      }
    -      val newRightOpt = joinType match {
    -        case LeftOuter if newConditionOpt.isDefined =>
    -          val inferredConstraints = right.getRelevantConstraints(
    -            right.constraints
    -              .union(left.constraints)
    -              .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
    -          val newFilters = inferredConstraints
    -            .filterNot(right.constraints.contains)
    -            .reduceLeftOption(And)
    -          newFilters.map(Filter(_, right))
    -        case _ => None
    -      }
    +      joinType match {
    +        // For inner join, we can infer additional filters for both sides. LeftSemi is kind of an
    +        // inner join, it just drops the right side in the final output.
    +        case _: InnerLike | LeftSemi =>
    +          val allConstraints = getAllConstraints(left, right, conditionOpt)
    +          val newLeft = inferNewFilter(left, allConstraints)
    +          val newRight = inferNewFilter(right, allConstraints)
    +          join.copy(left = newLeft, right = newRight)
     
    -      if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
    -        || newLeftOpt.isDefined || newRightOpt.isDefined) {
    -        Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt)
    -      } else {
    -        join
    +        // For right outer join, we can only infer additional filters for left side.
    +        case RightOuter =>
    +          val allConstraints = getAllConstraints(left, right, conditionOpt)
    +          val newLeft = inferNewFilter(left, allConstraints)
    +          join.copy(left = newLeft)
    +
    +        // For left join, we can only infer additional filters for right side.
    +        case LeftOuter | LeftAnti =>
    +          val allConstraints = getAllConstraints(left, right, conditionOpt)
    +          val newRight = inferNewFilter(right, allConstraints)
    +          join.copy(right = newRight)
    +
    +        case _ => join
           }
       }
    +
    +  private def getAllConstraints(
    +      left: LogicalPlan,
    +      right: LogicalPlan,
    +      conditionOpt: Option[Expression]): Set[Expression] = {
    +    val baseConstraints = left.constraints.union(right.constraints)
    +      .union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet)
    +    baseConstraints.union(inferAdditionalConstraints(baseConstraints))
    +  }
    +
    +  private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = {
    +    val newPredicates = constraints
    +      .union(constructIsNotNullConstraints(constraints, plan.output))
    --- End diff --
    
    Can we put the code 
    ```
    constraints
              .union(inferAdditionalConstraints(constraints))
              .union(constructIsNotNullConstraints(constraints, output))
              .filter { c =>
                c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
              }
    ```
    into the a function in `ConstraintHelper`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org