You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gengliangwang <gi...@git.apache.org> on 2018/04/23 07:21:10 UTC
[GitHub] spark pull request #21083: [SPARK-23564][SQL] infer additional filters from ...
Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/21083#discussion_r183295844
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
@@ -664,53 +662,52 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
}
case join @ Join(left, right, joinType, conditionOpt) =>
- // Only consider constraints that can be pushed down completely to either the left or the
- // right child
- val constraints = join.allConstraints.filter { c =>
- c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
- }
- // Remove those constraints that are already enforced by either the left or the right child
- val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
- val newConditionOpt = conditionOpt match {
- case Some(condition) =>
- val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
- if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
- case None =>
- additionalConstraints.reduceOption(And)
- }
- // Infer filter for left/right outer joins
- val newLeftOpt = joinType match {
- case RightOuter if newConditionOpt.isDefined =>
- val inferredConstraints = left.getRelevantConstraints(
- left.constraints
- .union(right.constraints)
- .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
- val newFilters = inferredConstraints
- .filterNot(left.constraints.contains)
- .reduceLeftOption(And)
- newFilters.map(Filter(_, left))
- case _ => None
- }
- val newRightOpt = joinType match {
- case LeftOuter if newConditionOpt.isDefined =>
- val inferredConstraints = right.getRelevantConstraints(
- right.constraints
- .union(left.constraints)
- .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
- val newFilters = inferredConstraints
- .filterNot(right.constraints.contains)
- .reduceLeftOption(And)
- newFilters.map(Filter(_, right))
- case _ => None
- }
+ joinType match {
+ // For inner join, we can infer additional filters for both sides. LeftSemi is kind of an
+ // inner join, it just drops the right side in the final output.
+ case _: InnerLike | LeftSemi =>
+ val allConstraints = getAllConstraints(left, right, conditionOpt)
+ val newLeft = inferNewFilter(left, allConstraints)
+ val newRight = inferNewFilter(right, allConstraints)
+ join.copy(left = newLeft, right = newRight)
- if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
- || newLeftOpt.isDefined || newRightOpt.isDefined) {
- Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt)
- } else {
- join
+ // For right outer join, we can only infer additional filters for left side.
+ case RightOuter =>
+ val allConstraints = getAllConstraints(left, right, conditionOpt)
+ val newLeft = inferNewFilter(left, allConstraints)
+ join.copy(left = newLeft)
+
+ // For left join, we can only infer additional filters for right side.
+ case LeftOuter | LeftAnti =>
+ val allConstraints = getAllConstraints(left, right, conditionOpt)
+ val newRight = inferNewFilter(right, allConstraints)
+ join.copy(right = newRight)
+
+ case _ => join
}
}
+
+ private def getAllConstraints(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ conditionOpt: Option[Expression]): Set[Expression] = {
+ val baseConstraints = left.constraints.union(right.constraints)
+ .union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet)
+ baseConstraints.union(inferAdditionalConstraints(baseConstraints))
+ }
+
+ private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = {
+ val newPredicates = constraints
+ .union(constructIsNotNullConstraints(constraints, plan.output))
--- End diff --
Can we put the code
```
constraints
.union(inferAdditionalConstraints(constraints))
.union(constructIsNotNullConstraints(constraints, output))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
}
```
into the a function in `ConstraintHelper`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org