You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/17 00:26:54 UTC

spark git commit: [SPARK-13871][SQL] Support for inferring filters from data constraints

Repository: spark
Updated Branches:
  refs/heads/master b90c0206f -> f96997ba2


[SPARK-13871][SQL] Support for inferring filters from data constraints

## What changes were proposed in this pull request?

This PR generalizes the `NullFiltering` optimizer rule in catalyst to `InferFiltersFromConstraints` that can automatically infer all relevant filters based on an operator's constraints while making sure of 2 things:

(a) no redundant filters are generated, and
(b) filters that do not contribute to any further optimizations are not generated.

## How was this patch tested?

Extended all tests in `InferFiltersFromConstraintsSuite` (that were initially based on `NullFilteringSuite` to test filter inference in `Filter` and `Join` operators.

In particular the 2 tests ( `single inner join with pre-existing filters: filter out values on either side` and `multiple inner joins: filter out values on all sides on equi-join keys` attempts to highlight/test the real potential of this rule for join optimization.

Author: Sameer Agarwal <sa...@databricks.com>

Closes #11665 from sameeragarwal/infer-filters.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f96997ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f96997ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f96997ba

Branch: refs/heads/master
Commit: f96997ba244a14c26e85a2475415a762d0c0d0a8
Parents: b90c020
Author: Sameer Agarwal <sa...@databricks.com>
Authored: Wed Mar 16 16:26:51 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 16 16:26:51 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  56 ++++-----
 .../InferFiltersFromConstraintsSuite.scala      | 123 +++++++++++++++++++
 .../catalyst/optimizer/NullFilteringSuite.scala | 112 -----------------
 3 files changed, 146 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f96997ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2de92d0..76f50a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -72,6 +72,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       LimitPushDown,
       ColumnPruning,
       EliminateOperators,
+      InferFiltersFromConstraints,
       // Operator combine
       CollapseRepartition,
       CollapseProject,
@@ -79,7 +80,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       CombineLimits,
       CombineUnions,
       // Constant folding and strength reduction
-      NullFiltering,
       NullPropagation,
       OptimizeIn,
       ConstantFolding,
@@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] {
 }
 
 /**
- * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
- * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
- * existing Filters and Join operators and are inferred based on their data constraints.
+ * Generate a list of additional filters from an operator's existing constraint but remove those
+ * that are either already part of the operator's condition or are part of the operator's child
+ * constraints. These filters are currently inserted to the existing conditions in the Filter
+ * operators and on either side of Join operators.
  *
  * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
  * LeftSemi joins.
  */
-object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
+object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case filter @ Filter(condition, child) =>
-      // We generate a list of additional isNotNull filters from the operator's existing constraints
-      // but remove those that are either already part of the filter condition or are part of the
-      // operator's child constraints.
-      val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
+      val newFilters = filter.constraints --
         (child.constraints ++ splitConjunctivePredicates(condition))
-      if (newIsNotNullConstraints.nonEmpty) {
-        Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
+      if (newFilters.nonEmpty) {
+        Filter(And(newFilters.reduce(And), condition), child)
       } else {
         filter
       }
 
-    case join @ Join(left, right, joinType, condition) =>
-      val leftIsNotNullConstraints = join.constraints
-        .filter(_.isInstanceOf[IsNotNull])
-        .filter(_.references.subsetOf(left.outputSet)) -- left.constraints
-      val rightIsNotNullConstraints =
-        join.constraints
-          .filter(_.isInstanceOf[IsNotNull])
-          .filter(_.references.subsetOf(right.outputSet)) -- right.constraints
-      val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
-        Filter(leftIsNotNullConstraints.reduce(And), left)
-      } else {
-        left
-      }
-      val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
-        Filter(rightIsNotNullConstraints.reduce(And), right)
-      } else {
-        right
-      }
-      if (newLeftChild != left || newRightChild != right) {
-        Join(newLeftChild, newRightChild, joinType, condition)
-      } else {
-        join
+    case join @ Join(left, right, joinType, conditionOpt) =>
+      // Only consider constraints that can be pushed down completely to either the left or the
+      // right child
+      val constraints = join.constraints.filter { c =>
+        c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)}
+      // Remove those constraints that are already enforced by either the left or the right child
+      val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
+      val newConditionOpt = conditionOpt match {
+        case Some(condition) =>
+          val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
+          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
+        case None =>
+          additionalConstraints.reduceOption(And)
       }
+      if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f96997ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
new file mode 100644
index 0000000..e7fdd5a
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class InferFiltersFromConstraintsSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) ::
+      Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) ::
+      Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+  test("filter: filter out constraints in condition") {
+    val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
+    val correctAnswer = testRelation
+      .where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single inner join: filter out values on either side on equi-join keys") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.join(y,
+      condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5)))
+      .analyze
+    val left = x.where(IsNotNull('a) && "x.a".attr === 1)
+    val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1)
+    val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single inner join: filter out nulls on either side on non equal keys") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.join(y,
+      condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+      .analyze
+    val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1)
+    val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5)
+    val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single inner join with pre-existing filters: filter out values on either side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.where('b > 5).join(y.where('a === 10),
+      condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
+    val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5)
+    val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5)
+    val correctAnswer = left.join(right,
+      condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single outer join: no null filters are generated") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.join(y, FullOuter,
+      condition = Some("x.a".attr === "y.a".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, originalQuery)
+  }
+
+  test("multiple inner joins: filter out values on all sides on equi-join keys") {
+    val t1 = testRelation.subquery('t1)
+    val t2 = testRelation.subquery('t2)
+    val t3 = testRelation.subquery('t3)
+    val t4 = testRelation.subquery('t4)
+
+    val originalQuery = t1.where('b > 5)
+      .join(t2, condition = Some("t1.b".attr === "t2.b".attr))
+      .join(t3, condition = Some("t2.b".attr === "t3.b".attr))
+      .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
+    val correctAnswer = t1.where(IsNotNull('b) && 'b > 5)
+      .join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr))
+      .join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr))
+      .join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr))
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("inner join with filter: filter out values on all sides on equi-join keys") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery =
+      x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze
+    val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5)
+      .join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f96997ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
deleted file mode 100644
index 142e4ae..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-
-class NullFilteringSuite extends PlanTest {
-
-  object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches = Batch("NullFiltering", Once, NullFiltering) ::
-      Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
-  }
-
-  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
-
-  test("filter: filter out nulls in condition") {
-    val originalQuery = testRelation.where('a === 1).analyze
-    val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("single inner join: filter out nulls on either side on equi-join keys") {
-    val x = testRelation.subquery('x)
-    val y = testRelation.subquery('y)
-    val originalQuery = x.join(y,
-      condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
-      .analyze
-    val left = x.where(IsNotNull('a) && IsNotNull('b))
-    val right = y.where(IsNotNull('a) && IsNotNull('c))
-    val correctAnswer = left.join(right,
-      condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
-      .analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("single inner join: filter out nulls on either side on non equal keys") {
-    val x = testRelation.subquery('x)
-    val y = testRelation.subquery('y)
-    val originalQuery = x.join(y,
-      condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
-      .analyze
-    val left = x.where(IsNotNull('a) && IsNotNull('b))
-    val right = y.where(IsNotNull('a) && IsNotNull('c))
-    val correctAnswer = left.join(right,
-      condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
-      .analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("single inner join with pre-existing filters: filter out nulls on either side") {
-    val x = testRelation.subquery('x)
-    val y = testRelation.subquery('y)
-    val originalQuery = x.where('b > 5).join(y.where('c === 10),
-      condition = Some("x.a".attr === "y.a".attr)).analyze
-    val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
-    val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
-    val correctAnswer = left.join(right,
-      condition = Some("x.a".attr === "y.a".attr)).analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("single outer join: no null filters are generated") {
-    val x = testRelation.subquery('x)
-    val y = testRelation.subquery('y)
-    val originalQuery = x.join(y, FullOuter,
-      condition = Some("x.a".attr === "y.a".attr)).analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, originalQuery)
-  }
-
-  test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
-    val t1 = testRelation.subquery('t1)
-    val t2 = testRelation.subquery('t2)
-    val t3 = testRelation.subquery('t3)
-    val t4 = testRelation.subquery('t4)
-
-    val originalQuery = t1
-      .join(t2, condition = Some("t1.b".attr === "t2.b".attr))
-      .join(t3, condition = Some("t2.b".attr === "t3.b".attr))
-      .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
-    val correctAnswer = t1.where(IsNotNull('b))
-      .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
-      .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
-      .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org