You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/18 02:36:50 UTC
spark git commit: [SPARK-21479][SQL] Outer join filter pushdown in
null supplying table when condition is on one of the joined columns
Repository: spark
Updated Branches:
refs/heads/master 5fccdae18 -> 1e3b8762a
[SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns
## What changes were proposed in this pull request?
Added `TransitPredicateInOuterJoin` optimization rule that transits constraints from the preserved side of an outer join to the null-supplying side. The constraints of the join operator will remain unchanged.
## How was this patch tested?
Added 3 tests in `InferFiltersFromConstraintsSuite`.
Author: maryannxue <ma...@gmail.com>
Closes #20816 from maryannxue/spark-21479.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e3b8762
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e3b8762
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e3b8762
Branch: refs/heads/master
Commit: 1e3b8762a854a07c317f69fba7fa1a7bcdc58ff3
Parents: 5fccdae
Author: maryannxue <ma...@gmail.com>
Authored: Wed Apr 18 10:36:41 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Apr 18 10:36:41 2018 +0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 42 ++++++++++++++++++--
.../plans/logical/QueryPlanConstraints.scala | 25 ++++++++++--
.../InferFiltersFromConstraintsSuite.scala | 36 +++++++++++++++++
3 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5fb59ef..913354e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -637,8 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] {
* constraints. These filters are currently inserted to the existing conditions in the Filter
* operators and on either side of Join operators.
*
- * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
- * LeftSemi joins.
+ * In addition, for left/right outer joins, infer predicate from the preserved side of the Join
+ * operator and push the inferred filter over to the null-supplying side. For example, if the
+ * preserved side has constraints of the form 'a > 5' and the join condition is 'a = b', in
+ * which 'b' is an attribute from the null-supplying side, a [[Filter]] operator of 'b > 5' will
+ * be applied to the null-supplying side.
*/
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
@@ -671,11 +674,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
val newConditionOpt = conditionOpt match {
case Some(condition) =>
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
- if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
+ if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
case None =>
additionalConstraints.reduceOption(And)
}
- if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
+ // Infer filter for left/right outer joins
+ val newLeftOpt = joinType match {
+ case RightOuter if newConditionOpt.isDefined =>
+ val inferredConstraints = left.getRelevantConstraints(
+ left.constraints
+ .union(right.constraints)
+ .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+ val newFilters = inferredConstraints
+ .filterNot(left.constraints.contains)
+ .reduceLeftOption(And)
+ newFilters.map(Filter(_, left))
+ case _ => None
+ }
+ val newRightOpt = joinType match {
+ case LeftOuter if newConditionOpt.isDefined =>
+ val inferredConstraints = right.getRelevantConstraints(
+ right.constraints
+ .union(left.constraints)
+ .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+ val newFilters = inferredConstraints
+ .filterNot(right.constraints.contains)
+ .reduceLeftOption(And)
+ newFilters.map(Filter(_, right))
+ case _ => None
+ }
+
+ if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
+ || newLeftOpt.isDefined || newRightOpt.isDefined) {
+ Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt)
+ } else {
+ join
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
index 0468488..a29f3d2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
*/
- lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
- c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
- })
+ lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly))
/**
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
@@ -56,6 +54,23 @@ trait QueryPlanConstraints { self: LogicalPlan =>
protected def validConstraints: Set[Expression] = Set.empty
/**
+ * Returns an [[ExpressionSet]] that contains an additional set of constraints, such as
+ * equality constraints and `isNotNull` constraints, etc., and that only contains references
+ * to this [[LogicalPlan]] node.
+ */
+ def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = {
+ val allRelevantConstraints =
+ if (conf.constraintPropagationEnabled) {
+ constraints
+ .union(inferAdditionalConstraints(constraints))
+ .union(constructIsNotNullConstraints(constraints))
+ } else {
+ constraints
+ }
+ ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly))
+ }
+
+ /**
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
* returns a constraint of the form `isNotNull(a)`
@@ -120,4 +135,8 @@ trait QueryPlanConstraints { self: LogicalPlan =>
destination: Attribute): Set[Expression] = constraints.map(_ transform {
case e: Expression if e.semanticEquals(source) => destination
})
+
+ private def selfReferenceOnly(e: Expression): Boolean = {
+ e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index f78c235..e068f51 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -204,4 +204,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
+
+ test("SPARK-21479: Outer join after-join filters push down to null-supplying side") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val condition = Some("x.a".attr === "y.a".attr)
+ val originalQuery = x.join(y, LeftOuter, condition).where("x.a".attr === 2).analyze
+ val left = x.where(IsNotNull('a) && 'a === 2)
+ val right = y.where(IsNotNull('a) && 'a === 2)
+ val correctAnswer = left.join(right, LeftOuter, condition).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("SPARK-21479: Outer join pre-existing filters push down to null-supplying side") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val condition = Some("x.a".attr === "y.a".attr)
+ val originalQuery = x.join(y.where("y.a".attr > 5), RightOuter, condition).analyze
+ val left = x.where(IsNotNull('a) && 'a > 5)
+ val right = y.where(IsNotNull('a) && 'a > 5)
+ val correctAnswer = left.join(right, RightOuter, condition).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("SPARK-21479: Outer join no filter push down to preserved side") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val condition = Some("x.a".attr === "y.a".attr)
+ val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, condition).analyze
+ val left = x
+ val right = y.where(IsNotNull('a) && 'a === 1)
+ val correctAnswer = left.join(right, LeftOuter, condition).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org