You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/11 02:29:48 UTC

spark git commit: [SPARK-13789] Infer additional constraints from attribute equality

Repository: spark
Updated Branches:
  refs/heads/master 416e71af4 -> c3a6269ca


[SPARK-13789] Infer additional constraints from attribute equality

## What changes were proposed in this pull request?

This PR adds support for inferring an additional set of data constraints based on attribute equality. For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), we can now automatically infer an additional constraint of the form `b = 5`

## How was this patch tested?

Tested that new constraints are properly inferred for filters (by adding a new test) and equi-joins (by modifying an existing test)

Author: Sameer Agarwal <sa...@databricks.com>

Closes #11618 from sameeragarwal/infer-isequal-constraints.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3a6269c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3a6269c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3a6269c

Branch: refs/heads/master
Commit: c3a6269ca994a977303a450043a577f435565f4e
Parents: 416e71a
Author: Sameer Agarwal <sa...@databricks.com>
Authored: Thu Mar 10 17:29:45 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Mar 10 17:29:45 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/QueryPlan.scala    | 21 ++++++++++++++++++++
 .../plans/ConstraintPropagationSuite.scala      | 14 +++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c3a6269c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 40c06ed..c222571 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -32,6 +32,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
    */
   protected def getRelevantConstraints(constraints: Set[Expression]): Set[Expression] = {
     constraints
+      .union(inferAdditionalConstraints(constraints))
       .union(constructIsNotNullConstraints(constraints))
       .filter(constraint =>
         constraint.references.nonEmpty && constraint.references.subsetOf(outputSet))
@@ -64,6 +65,26 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
   }
 
   /**
+   * Infers an additional set of constraints from a given set of equality constraints.
+   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
+   * additional constraint of the form `b = 5`
+   */
+  private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
+    var inferredConstraints = Set.empty[Expression]
+    constraints.foreach {
+      case eq @ EqualTo(l: Attribute, r: Attribute) =>
+        inferredConstraints ++= (constraints - eq).map(_ transform {
+          case a: Attribute if a.semanticEquals(l) => r
+        })
+        inferredConstraints ++= (constraints - eq).map(_ transform {
+          case a: Attribute if a.semanticEquals(r) => l
+        })
+      case _ => // No inference
+    }
+    inferredConstraints -- constraints
+  }
+
+  /**
    * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
    * example, if this set contains the expression `a = 2` then that expression is guaranteed to
    * evaluate to `true` for all rows produced.

http://git-wip-us.apache.org/repos/asf/spark/blob/c3a6269c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index e70d379..a9375a7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -158,6 +158,7 @@ class ConstraintPropagationSuite extends SparkFunSuite {
         tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100,
         tr1.resolveQuoted("a", caseInsensitiveResolution).get ===
           tr2.resolveQuoted("a", caseInsensitiveResolution).get,
+        tr2.resolveQuoted("a", caseInsensitiveResolution).get > 10,
         IsNotNull(tr2.resolveQuoted("a", caseInsensitiveResolution).get),
         IsNotNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get),
         IsNotNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get))))
@@ -203,4 +204,17 @@ class ConstraintPropagationSuite extends SparkFunSuite {
       .join(tr2.where('d.attr < 100), FullOuter, Some("tr1.a".attr === "tr2.a".attr))
       .analyze.constraints.isEmpty)
   }
+
+  test("infer additional constraints in filters") {
+    val tr = LocalRelation('a.int, 'b.int, 'c.int)
+
+    verifyConstraints(tr
+      .where('a.attr > 10 && 'a.attr === 'b.attr)
+      .analyze.constraints,
+      ExpressionSet(Seq(resolveColumn(tr, "a") > 10,
+        resolveColumn(tr, "b") > 10,
+        resolveColumn(tr, "a") === resolveColumn(tr, "b"),
+        IsNotNull(resolveColumn(tr, "a")),
+        IsNotNull(resolveColumn(tr, "b")))))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org