You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/03/09 21:51:41 UTC

spark git commit: [SPARK-13527][SQL] Prune Filters based on Constraints

Repository: spark
Updated Branches:
  refs/heads/master 3dc9ae2e1 -> c6aa356cd


[SPARK-13527][SQL] Prune Filters based on Constraints

#### What changes were proposed in this pull request?

Remove all the deterministic conditions in a [[Filter]] that are contained in the Child's Constraints.

For example, the first query can be simplified to the second one.

```scala
    val queryWithUselessFilter = tr1
      .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
      .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
      .where(
        ("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
        'd.attr < 100 &&
        "tr2.a".attr === "tr1.a".attr)
```
```scala
    val query = tr1
      .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
      .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
```
#### How was this patch tested?

Six test cases are added.

Author: gatorsmile <ga...@gmail.com>

Closes #11406 from gatorsmile/FilterRemoval.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6aa356c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6aa356c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6aa356c

Branch: refs/heads/master
Commit: c6aa356cd831ea2d159568b699bd5b791f3d8f25
Parents: 3dc9ae2
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Mar 9 12:50:55 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Mar 9 12:50:55 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  26 +++-
 .../optimizer/BooleanSimplificationSuite.scala  |   2 +-
 .../catalyst/optimizer/PruneFiltersSuite.scala  | 136 +++++++++++++++++++
 .../catalyst/optimizer/SetOperationSuite.scala  |   2 +-
 .../datasources/parquet/ParquetFilters.scala    |   2 +-
 5 files changed, 160 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 586bf3d..650b4ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -86,7 +86,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       BooleanSimplification,
       SimplifyConditionals,
       RemoveDispensableExpressions,
-      SimplifyFilters,
+      PruneFilters,
       SimplifyCasts,
       SimplifyCaseConversionExpressions,
       EliminateSerialization) ::
@@ -827,11 +827,12 @@ object CombineFilters extends Rule[LogicalPlan] {
 }
 
 /**
- * Removes filters that can be evaluated trivially.  This is done either by eliding the filter for
- * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the
- * filter will always evaluate to `false`.
+ * Removes filters that can be evaluated trivially.  This can be done through the following ways:
+ * 1) by eliding the filter for cases where it will always evaluate to `true`.
+ * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`.
+ * 3) by eliminating the always-true conditions given the constraints on the child's output.
  */
-object SimplifyFilters extends Rule[LogicalPlan] {
+object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // If the filter condition always evaluate to true, remove the filter.
     case Filter(Literal(true, BooleanType), child) => child
@@ -839,6 +840,21 @@ object SimplifyFilters extends Rule[LogicalPlan] {
     // replace the input with an empty relation.
     case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty)
     case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty)
+    // If any deterministic condition is guaranteed to be true given the constraints on the child's
+    // output, remove the condition
+    case f @ Filter(fc, p: LogicalPlan) =>
+      val (prunedPredicates, remainingPredicates) =
+        splitConjunctivePredicates(fc).partition { cond =>
+          cond.deterministic && p.constraints.contains(cond)
+        }
+      if (prunedPredicates.isEmpty) {
+        f
+      } else if (remainingPredicates.isEmpty) {
+        p
+      } else {
+        val newCond = remainingPredicates.reduce(And)
+        Filter(newCond, p)
+      }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 3e52441..da43751 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -36,7 +36,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
         NullPropagation,
         ConstantFolding,
         BooleanSimplification,
-        SimplifyFilters) :: Nil
+        PruneFilters) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
new file mode 100644
index 0000000..0ee7cf9
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class PruneFiltersSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateSubqueryAliases) ::
+      Batch("Filter Pushdown and Pruning", Once,
+        CombineFilters,
+        PruneFilters,
+        PushPredicateThroughProject,
+        PushPredicateThroughJoin) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+  test("Constraints of isNull + LeftOuter") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val query = x.where("x.b".attr.isNull).join(y, LeftOuter)
+    val queryWithUselessFilter = query.where("x.b".attr.isNull)
+
+    val optimized = Optimize.execute(queryWithUselessFilter.analyze)
+    val correctAnswer = query.analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Constraints of unionall") {
+    val tr1 = LocalRelation('a.int, 'b.int, 'c.int)
+    val tr2 = LocalRelation('d.int, 'e.int, 'f.int)
+    val tr3 = LocalRelation('g.int, 'h.int, 'i.int)
+
+    val query =
+      tr1.where('a.attr > 10)
+        .unionAll(tr2.where('d.attr > 10)
+        .unionAll(tr3.where('g.attr > 10)))
+    val queryWithUselessFilter = query.where('a.attr > 10)
+
+    val optimized = Optimize.execute(queryWithUselessFilter.analyze)
+    val correctAnswer = query.analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Pruning multiple constraints in the same run") {
+    val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
+    val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)
+
+    val query = tr1
+      .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
+      .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
+    // different order of "tr2.a" and "tr1.a"
+    val queryWithUselessFilter =
+      query.where(
+        ("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
+          'd.attr < 100 &&
+          "tr2.a".attr === "tr1.a".attr)
+
+    val optimized = Optimize.execute(queryWithUselessFilter.analyze)
+    val correctAnswer = query.analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Partial pruning") {
+    val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
+    val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)
+
+    // One of the filter condition does not exist in the constraints of its child
+    // Thus, the filter is not removed
+    val query = tr1
+      .where("tr1.a".attr > 10)
+      .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr))
+    val queryWithExtraFilters =
+      query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr)
+
+    val optimized = Optimize.execute(queryWithExtraFilters.analyze)
+    val correctAnswer = tr1
+      .where("tr1.a".attr > 10)
+      .join(tr2.where('d.attr < 100),
+        Inner,
+        Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("No predicate is pruned") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val query = x.where("x.b".attr.isNull).join(y, LeftOuter)
+    val queryWithExtraFilters = query.where("x.b".attr.isNotNull)
+
+    val optimized = Optimize.execute(queryWithExtraFilters.analyze)
+    val correctAnswer =
+      testRelation.where("b".attr.isNull).where("b".attr.isNotNull)
+        .join(testRelation, LeftOuter).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Nondeterministic predicate is not pruned") {
+    val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze
+    comparePlans(optimized, correctAnswer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 50f3b51..b08cdc8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -32,7 +32,7 @@ class SetOperationSuite extends PlanTest {
       Batch("Union Pushdown", Once,
         CombineUnions,
         SetOperationPushDown,
-        SimplifyFilters) :: Nil
+        PruneFilters) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 5a5cb5c..95afdc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -234,7 +234,7 @@ private[sql] object ParquetFilters {
     //
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
     // which can be casted to `false` implicitly. Please refer to the `eval` method of these
-    // operators and the `SimplifyFilters` rule for details.
+    // operators and the `PruneFilters` rule for details.
 
     // Hyukjin:
     // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org