You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/11/30 11:41:04 UTC

spark git commit: [SPARK-17897][SQL] Fixed IsNotNull Constraint Inference Rule

Repository: spark
Updated Branches:
  refs/heads/master c5a64d760 -> 2eb093dec


[SPARK-17897][SQL] Fixed IsNotNull Constraint Inference Rule

### What changes were proposed in this pull request?
The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)

Below is the existing code we have for `IsNotNull` pushdown.
```Scala
  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
    case a: Attribute => Seq(a)
    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
      expr.children.flatMap(scanNullIntolerantExpr)
    case _ => Seq.empty[Attribute]
  }
```

**`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root.

Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]
```

After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]
```

### How was this patch tested?
Added a test

Author: gatorsmile <ga...@gmail.com>

Closes #16067 from gatorsmile/isNotNull2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2eb093de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2eb093de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2eb093de

Branch: refs/heads/master
Commit: 2eb093decb5e87a1ea71bbaa28092876a8c84996
Parents: c5a64d7
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Nov 30 19:40:58 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Nov 30 19:40:58 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/QueryPlan.scala    | 27 +++++++++++++++-----
 .../plans/ConstraintPropagationSuite.scala      |  9 +++++++
 .../org/apache/spark/sql/DataFrameSuite.scala   |  6 +++++
 3 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2eb093de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 45ee296..b108017 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -40,14 +40,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
   }
 
   /**
-   * Infers a set of `isNotNull` constraints from a given set of equality/comparison expressions as
-   * well as non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
+   * Infers a set of `isNotNull` constraints from null intolerant expressions as well as
+   * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
    * returns a constraint of the form `isNotNull(a)`
    */
   private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = {
     // First, we propagate constraints from the null intolerant expressions.
-    var isNotNullConstraints: Set[Expression] =
-      constraints.flatMap(scanNullIntolerantExpr).map(IsNotNull(_))
+    var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints)
 
     // Second, we infer additional constraints from non-nullable attributes that are part of the
     // operator's output
@@ -58,13 +57,27 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
   }
 
   /**
+   * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
+   * of constraints.
+   */
+  private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
+    constraint match {
+      // When the root is IsNotNull, we can push IsNotNull through the child null intolerant
+      // expressions
+      case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+      // Constraints always return true for all the inputs. That means, null will never be returned.
+      // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
+      // null intolerant expressions.
+      case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+    }
+
+  /**
    * Recursively explores the expressions which are null intolerant and returns all attributes
    * in these expressions.
    */
-  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
+  private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
     case a: Attribute => Seq(a)
-    case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
-      expr.children.flatMap(scanNullIntolerantExpr)
+    case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
     case _ => Seq.empty[Attribute]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2eb093de/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 8068ce9..a191aa8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -351,6 +351,15 @@ class ConstraintPropagationSuite extends SparkFunSuite {
         IsNotNull(IsNotNull(resolveColumn(tr, "b"))),
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "c")))))
+
+    verifyConstraints(
+      tr.where('a.attr === 1 && IsNotNull(resolveColumn(tr, "b")) &&
+        IsNotNull(resolveColumn(tr, "c"))).analyze.constraints,
+      ExpressionSet(Seq(
+        resolveColumn(tr, "a") === 1,
+        IsNotNull(resolveColumn(tr, "c")),
+        IsNotNull(resolveColumn(tr, "a")),
+        IsNotNull(resolveColumn(tr, "b")))))
   }
 
   test("infer IsNotNull constraints from non-nullable attributes") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2eb093de/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f5bc878..312cd17 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1697,6 +1697,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       expr = "cast((_1 + _2) as boolean)", expectedNonNullableColumns = Seq("_1", "_2"))
   }
 
+  test("SPARK-17897: Fixed IsNotNull Constraint Inference Rule") {
+    val data = Seq[java.lang.Integer](1, null).toDF("key")
+    checkAnswer(data.filter(!$"key".isNotNull), Row(null))
+    checkAnswer(data.filter(!(- $"key").isNotNull), Row(null))
+  }
+
   test("SPARK-17957: outer join + na.fill") {
     val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
     val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org