You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/06/14 21:28:23 UTC
spark git commit: [SPARK-21091][SQL] Move constraint code into
QueryPlanConstraints
Repository: spark
Updated Branches:
refs/heads/master 77a2fc5b5 -> e254e868f
[SPARK-21091][SQL] Move constraint code into QueryPlanConstraints
## What changes were proposed in this pull request?
This patch moves constraint related code into a separate trait QueryPlanConstraints, so we don't litter QueryPlan with a lot of constraint private functions.
## How was this patch tested?
This is a simple move refactoring and should be covered by existing tests.
Author: Reynold Xin <rx...@databricks.com>
Closes #18298 from rxin/SPARK-21091.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e254e868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e254e868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e254e868
Branch: refs/heads/master
Commit: e254e868f1e640b59d8d3e0e01a5e0c488dd6e70
Parents: 77a2fc5
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Jun 14 14:28:21 2017 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jun 14 14:28:21 2017 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/plans/QueryPlan.scala | 187 +----------------
.../catalyst/plans/QueryPlanConstraints.scala | 206 +++++++++++++++++++
2 files changed, 210 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e254e868/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 5ba043e..8bc462e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -21,194 +21,15 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{DataType, StructType}
-abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
+abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
+ extends TreeNode[PlanType]
+ with QueryPlanConstraints[PlanType] {
+
self: PlanType =>
def output: Seq[Attribute]
/**
- * Extracts the relevant constraints from a given set of constraints based on the attributes that
- * appear in the [[outputSet]].
- */
- protected def getRelevantConstraints(constraints: Set[Expression]): Set[Expression] = {
- constraints
- .union(inferAdditionalConstraints(constraints))
- .union(constructIsNotNullConstraints(constraints))
- .filter(constraint =>
- constraint.references.nonEmpty && constraint.references.subsetOf(outputSet) &&
- constraint.deterministic)
- }
-
- /**
- * Infers a set of `isNotNull` constraints from null intolerant expressions as well as
- * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
- * returns a constraint of the form `isNotNull(a)`
- */
- private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = {
- // First, we propagate constraints from the null intolerant expressions.
- var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints)
-
- // Second, we infer additional constraints from non-nullable attributes that are part of the
- // operator's output
- val nonNullableAttributes = output.filterNot(_.nullable)
- isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull).toSet
-
- isNotNullConstraints -- constraints
- }
-
- /**
- * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
- * of constraints.
- */
- private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
- constraint match {
- // When the root is IsNotNull, we can push IsNotNull through the child null intolerant
- // expressions
- case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
- // Constraints always return true for all the inputs. That means, null will never be returned.
- // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
- // null intolerant expressions.
- case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
- }
-
- /**
- * Recursively explores the expressions which are null intolerant and returns all attributes
- * in these expressions.
- */
- private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
- case a: Attribute => Seq(a)
- case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
- case _ => Seq.empty[Attribute]
- }
-
- // Collect aliases from expressions of the whole tree rooted by the current QueryPlan node, so
- // we may avoid producing recursive constraints.
- private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
- expressions.collect {
- case a: Alias => (a.toAttribute, a.child)
- } ++ children.flatMap(_.aliasMap))
-
- /**
- * Infers an additional set of constraints from a given set of equality constraints.
- * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
- * additional constraint of the form `b = 5`.
- *
- * [SPARK-17733] We explicitly prevent producing recursive constraints of the form `a = f(a, b)`
- * as they are often useless and can lead to a non-converging set of constraints.
- */
- private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
- val constraintClasses = generateEquivalentConstraintClasses(constraints)
-
- var inferredConstraints = Set.empty[Expression]
- constraints.foreach {
- case eq @ EqualTo(l: Attribute, r: Attribute) =>
- val candidateConstraints = constraints - eq
- inferredConstraints ++= candidateConstraints.map(_ transform {
- case a: Attribute if a.semanticEquals(l) &&
- !isRecursiveDeduction(r, constraintClasses) => r
- })
- inferredConstraints ++= candidateConstraints.map(_ transform {
- case a: Attribute if a.semanticEquals(r) &&
- !isRecursiveDeduction(l, constraintClasses) => l
- })
- case _ => // No inference
- }
- inferredConstraints -- constraints
- }
-
- /*
- * Generate a sequence of expression sets from constraints, where each set stores an equivalence
- * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will generate the following
- * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search all expressions equal
- * to an selected attribute.
- */
- private def generateEquivalentConstraintClasses(
- constraints: Set[Expression]): Seq[Set[Expression]] = {
- var constraintClasses = Seq.empty[Set[Expression]]
- constraints.foreach {
- case eq @ EqualTo(l: Attribute, r: Attribute) =>
- // Transform [[Alias]] to its child.
- val left = aliasMap.getOrElse(l, l)
- val right = aliasMap.getOrElse(r, r)
- // Get the expression set for an equivalence constraint class.
- val leftConstraintClass = getConstraintClass(left, constraintClasses)
- val rightConstraintClass = getConstraintClass(right, constraintClasses)
- if (leftConstraintClass.nonEmpty && rightConstraintClass.nonEmpty) {
- // Combine the two sets.
- constraintClasses = constraintClasses
- .diff(leftConstraintClass :: rightConstraintClass :: Nil) :+
- (leftConstraintClass ++ rightConstraintClass)
- } else if (leftConstraintClass.nonEmpty) { // && rightConstraintClass.isEmpty
- // Update equivalence class of `left` expression.
- constraintClasses = constraintClasses
- .diff(leftConstraintClass :: Nil) :+ (leftConstraintClass + right)
- } else if (rightConstraintClass.nonEmpty) { // && leftConstraintClass.isEmpty
- // Update equivalence class of `right` expression.
- constraintClasses = constraintClasses
- .diff(rightConstraintClass :: Nil) :+ (rightConstraintClass + left)
- } else { // leftConstraintClass.isEmpty && rightConstraintClass.isEmpty
- // Create new equivalence constraint class since neither expression presents
- // in any classes.
- constraintClasses = constraintClasses :+ Set(left, right)
- }
- case _ => // Skip
- }
-
- constraintClasses
- }
-
- /*
- * Get all expressions equivalent to the selected expression.
- */
- private def getConstraintClass(
- expr: Expression,
- constraintClasses: Seq[Set[Expression]]): Set[Expression] =
- constraintClasses.find(_.contains(expr)).getOrElse(Set.empty[Expression])
-
- /*
- * Check whether replace by an [[Attribute]] will cause a recursive deduction. Generally it
- * has the form like: `a -> f(a, b)`, where `a` and `b` are expressions and `f` is a function.
- * Here we first get all expressions equal to `attr` and then check whether at least one of them
- * is a child of the referenced expression.
- */
- private def isRecursiveDeduction(
- attr: Attribute,
- constraintClasses: Seq[Set[Expression]]): Boolean = {
- val expr = aliasMap.getOrElse(attr, attr)
- getConstraintClass(expr, constraintClasses).exists { e =>
- expr.children.exists(_.semanticEquals(e))
- }
- }
-
- /**
- * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
- * example, if this set contains the expression `a = 2` then that expression is guaranteed to
- * evaluate to `true` for all rows produced.
- */
- lazy val constraints: ExpressionSet = ExpressionSet(getRelevantConstraints(validConstraints))
-
- /**
- * Returns [[constraints]] depending on the config of enabling constraint propagation. If the
- * flag is disabled, simply returning an empty constraints.
- */
- private[spark] def getConstraints(constraintPropagationEnabled: Boolean): ExpressionSet =
- if (constraintPropagationEnabled) {
- constraints
- } else {
- ExpressionSet(Set.empty)
- }
-
- /**
- * This method can be overridden by any child class of QueryPlan to specify a set of constraints
- * based on the given operator's constraint propagation logic. These constraints are then
- * canonicalized and filtered automatically to contain only those attributes that appear in the
- * [[outputSet]].
- *
- * See [[Canonicalize]] for more details.
- */
- protected def validConstraints: Set[Expression] = Set.empty
-
- /**
* Returns the set of attributes that are output by this node.
*/
def outputSet: AttributeSet = AttributeSet(output)
http://git-wip-us.apache.org/repos/asf/spark/blob/e254e868/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
new file mode 100644
index 0000000..7d8a17d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import org.apache.spark.sql.catalyst.expressions._
+
+
+trait QueryPlanConstraints[PlanType <: QueryPlan[PlanType]] { self: QueryPlan[PlanType] =>
+
+ /**
+ * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
+ * example, if this set contains the expression `a = 2` then that expression is guaranteed to
+ * evaluate to `true` for all rows produced.
+ */
+ lazy val constraints: ExpressionSet = ExpressionSet(getRelevantConstraints(validConstraints))
+
+ /**
+ * Returns [[constraints]] depending on the config of enabling constraint propagation. If the
+ * flag is disabled, simply returning an empty constraints.
+ */
+ def getConstraints(constraintPropagationEnabled: Boolean): ExpressionSet =
+ if (constraintPropagationEnabled) {
+ constraints
+ } else {
+ ExpressionSet(Set.empty)
+ }
+
+ /**
+ * This method can be overridden by any child class of QueryPlan to specify a set of constraints
+ * based on the given operator's constraint propagation logic. These constraints are then
+ * canonicalized and filtered automatically to contain only those attributes that appear in the
+ * [[outputSet]].
+ *
+ * See [[Canonicalize]] for more details.
+ */
+ protected def validConstraints: Set[Expression] = Set.empty
+
+ /**
+ * Extracts the relevant constraints from a given set of constraints based on the attributes that
+ * appear in the [[outputSet]].
+ */
+ protected def getRelevantConstraints(constraints: Set[Expression]): Set[Expression] = {
+ constraints
+ .union(inferAdditionalConstraints(constraints))
+ .union(constructIsNotNullConstraints(constraints))
+ .filter(constraint =>
+ constraint.references.nonEmpty && constraint.references.subsetOf(outputSet) &&
+ constraint.deterministic)
+ }
+
+ /**
+ * Infers a set of `isNotNull` constraints from null intolerant expressions as well as
+ * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
+ * returns a constraint of the form `isNotNull(a)`
+ */
+ private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = {
+ // First, we propagate constraints from the null intolerant expressions.
+ var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints)
+
+ // Second, we infer additional constraints from non-nullable attributes that are part of the
+ // operator's output
+ val nonNullableAttributes = output.filterNot(_.nullable)
+ isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull).toSet
+
+ isNotNullConstraints -- constraints
+ }
+
+ /**
+ * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
+ * of constraints.
+ */
+ private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
+ constraint match {
+ // When the root is IsNotNull, we can push IsNotNull through the child null intolerant
+ // expressions
+ case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+ // Constraints always return true for all the inputs. That means, null will never be returned.
+ // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
+ // null intolerant expressions.
+ case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+ }
+
+ /**
+ * Recursively explores the expressions which are null intolerant and returns all attributes
+ * in these expressions.
+ */
+ private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
+ case a: Attribute => Seq(a)
+ case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
+ case _ => Seq.empty[Attribute]
+ }
+
+ // Collect aliases from expressions of the whole tree rooted by the current QueryPlan node, so
+ // we may avoid producing recursive constraints.
+ private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
+ expressions.collect {
+ case a: Alias => (a.toAttribute, a.child)
+ } ++ children.flatMap(_.asInstanceOf[QueryPlanConstraints[PlanType]].aliasMap))
+
+ /**
+ * Infers an additional set of constraints from a given set of equality constraints.
+ * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
+ * additional constraint of the form `b = 5`.
+ *
+ * [SPARK-17733] We explicitly prevent producing recursive constraints of the form `a = f(a, b)`
+ * as they are often useless and can lead to a non-converging set of constraints.
+ */
+ private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
+ val constraintClasses = generateEquivalentConstraintClasses(constraints)
+
+ var inferredConstraints = Set.empty[Expression]
+ constraints.foreach {
+ case eq @ EqualTo(l: Attribute, r: Attribute) =>
+ val candidateConstraints = constraints - eq
+ inferredConstraints ++= candidateConstraints.map(_ transform {
+ case a: Attribute if a.semanticEquals(l) &&
+ !isRecursiveDeduction(r, constraintClasses) => r
+ })
+ inferredConstraints ++= candidateConstraints.map(_ transform {
+ case a: Attribute if a.semanticEquals(r) &&
+ !isRecursiveDeduction(l, constraintClasses) => l
+ })
+ case _ => // No inference
+ }
+ inferredConstraints -- constraints
+ }
+
+ /**
+ * Generate a sequence of expression sets from constraints, where each set stores an equivalence
+ * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will generate the following
+ * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search all expressions equal
+ * to an selected attribute.
+ */
+ private def generateEquivalentConstraintClasses(
+ constraints: Set[Expression]): Seq[Set[Expression]] = {
+ var constraintClasses = Seq.empty[Set[Expression]]
+ constraints.foreach {
+ case eq @ EqualTo(l: Attribute, r: Attribute) =>
+ // Transform [[Alias]] to its child.
+ val left = aliasMap.getOrElse(l, l)
+ val right = aliasMap.getOrElse(r, r)
+ // Get the expression set for an equivalence constraint class.
+ val leftConstraintClass = getConstraintClass(left, constraintClasses)
+ val rightConstraintClass = getConstraintClass(right, constraintClasses)
+ if (leftConstraintClass.nonEmpty && rightConstraintClass.nonEmpty) {
+ // Combine the two sets.
+ constraintClasses = constraintClasses
+ .diff(leftConstraintClass :: rightConstraintClass :: Nil) :+
+ (leftConstraintClass ++ rightConstraintClass)
+ } else if (leftConstraintClass.nonEmpty) { // && rightConstraintClass.isEmpty
+ // Update equivalence class of `left` expression.
+ constraintClasses = constraintClasses
+ .diff(leftConstraintClass :: Nil) :+ (leftConstraintClass + right)
+ } else if (rightConstraintClass.nonEmpty) { // && leftConstraintClass.isEmpty
+ // Update equivalence class of `right` expression.
+ constraintClasses = constraintClasses
+ .diff(rightConstraintClass :: Nil) :+ (rightConstraintClass + left)
+ } else { // leftConstraintClass.isEmpty && rightConstraintClass.isEmpty
+ // Create new equivalence constraint class since neither expression presents
+ // in any classes.
+ constraintClasses = constraintClasses :+ Set(left, right)
+ }
+ case _ => // Skip
+ }
+
+ constraintClasses
+ }
+
+ /**
+ * Get all expressions equivalent to the selected expression.
+ */
+ private def getConstraintClass(
+ expr: Expression,
+ constraintClasses: Seq[Set[Expression]]): Set[Expression] =
+ constraintClasses.find(_.contains(expr)).getOrElse(Set.empty[Expression])
+
+ /**
+ * Check whether replace by an [[Attribute]] will cause a recursive deduction. Generally it
+ * has the form like: `a -> f(a, b)`, where `a` and `b` are expressions and `f` is a function.
+ * Here we first get all expressions equal to `attr` and then check whether at least one of them
+ * is a child of the referenced expression.
+ */
+ private def isRecursiveDeduction(
+ attr: Attribute,
+ constraintClasses: Seq[Set[Expression]]): Boolean = {
+ val expr = aliasMap.getOrElse(attr, attr)
+ getConstraintClass(expr, constraintClasses).exists { e =>
+ expr.children.exists(_.semanticEquals(e))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org