You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/30 22:38:30 UTC

spark git commit: [SPARK-4937][SQL] Normalizes conjunctions and disjunctions to eliminate common predicates

Repository: spark
Updated Branches:
  refs/heads/master a75dd83b7 -> 61a99f6a1


[SPARK-4937][SQL] Normalizes conjunctions and disjunctions to eliminate common predicates

This PR is a simplified version of several filter optimization rules introduced in #3778 authored by scwf. Newly introduced optimizations include:

1. `a && a` => `a`
2. `a || a` => `a`
3. `(a || b || c || ...) && (a || b || d || ...)` => `a && b && (c || d || ...)`

The 3rd rule is particularly useful for optimizing the following query, which is planned into a cartesian product

```sql
SELECT *
  FROM t1, t2
 WHERE (t1.key = t2.key AND t1.value > 10)
    OR (t1.key = t2.key AND t2.value < 20)
```

to the following one, which is planned into an equi-join:

```sql
SELECT *
  FROM t1, t2
 WHERE t1.key = t2.key
   AND (t1.value > 10 OR t2.value < 20)
```

The example above is quite artificial, but common predicates are likely to appear in real life complex queries (like the one mentioned in #3778).

A difference between this PR and #3778 is that these optimizations are not limited to `Filter`, but are generalized to all logical plan nodes. Thanks to scwf for bringing up these optimizations, and chenghao-intel for the generalization suggestion.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3784)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #3784 from liancheng/normalize-filters and squashes the following commits:

caca560 [Cheng Lian] Moves filter normalization into BooleanSimplification rule
4ab3a58 [Cheng Lian] Fixes test failure, adds more tests
5d54349 [Cheng Lian] Fixes typo in comment
2abbf8e [Cheng Lian] Forgot our sacred Apache licence header...
cf95639 [Cheng Lian] Adds an optimization rule for filter normalization


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61a99f6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61a99f6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61a99f6a

Branch: refs/heads/master
Commit: 61a99f6a11d85e931e7d60f9ab4370b3b40a52ef
Parents: a75dd83
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Dec 30 13:38:27 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Dec 30 13:38:27 2014 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   |  9 ++-
 .../sql/catalyst/optimizer/Optimizer.scala      | 27 ++++++--
 .../optimizer/NormalizeFiltersSuite.scala       | 72 ++++++++++++++++++++
 .../columnar/PartitionBatchPruningSuite.scala   | 10 ++-
 4 files changed, 110 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61a99f6a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 94b6fb0..cb5ff67 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import scala.collection.immutable.HashSet
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.BooleanType
@@ -48,6 +47,14 @@ trait PredicateHelper {
     }
   }
 
+  protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
+    condition match {
+      case Or(cond1, cond2) =>
+        splitDisjunctivePredicates(cond1) ++ splitDisjunctivePredicates(cond2)
+      case other => other :: Nil
+    }
+  }
+
   /**
    * Returns true if `expr` can be evaluated using only the output of `plan`.  This method
    * can be used to determine when is is acceptable to move expression evaluation within a query

http://git-wip-us.apache.org/repos/asf/spark/blob/61a99f6a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0f2eae6..cd31379 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -294,11 +294,16 @@ object OptimizeIn extends Rule[LogicalPlan] {
 }
 
 /**
- * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
+ * Simplifies boolean expressions:
+ *
+ * 1. Simplifies expressions whose answer can be determined without evaluating both sides.
+ * 2. Eliminates / extracts common factors.
+ * 3. Removes `Not` operator.
+ *
  * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
  * is only safe when evaluations of expressions does not result in side effects.
  */
-object BooleanSimplification extends Rule[LogicalPlan] {
+object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case q: LogicalPlan => q transformExpressionsUp {
       case and @ And(left, right) =>
@@ -307,7 +312,9 @@ object BooleanSimplification extends Rule[LogicalPlan] {
           case (l, Literal(true, BooleanType)) => l
           case (Literal(false, BooleanType), _) => Literal(false)
           case (_, Literal(false, BooleanType)) => Literal(false)
-          case (_, _) => and
+          // a && a && a ... => a
+          case _ if splitConjunctivePredicates(and).distinct.size == 1 => left
+          case _ => and
         }
 
       case or @ Or(left, right) =>
@@ -316,7 +323,19 @@ object BooleanSimplification extends Rule[LogicalPlan] {
           case (_, Literal(true, BooleanType)) => Literal(true)
           case (Literal(false, BooleanType), r) => r
           case (l, Literal(false, BooleanType)) => l
-          case (_, _) => or
+          // a || a || a ... => a
+          case _ if splitDisjunctivePredicates(or).distinct.size == 1 => left
+          // (a && b && c && ...) || (a && b && d && ...) => a && b && (c || d || ...)
+          case _ =>
+            val lhsSet = splitConjunctivePredicates(left).toSet
+            val rhsSet = splitConjunctivePredicates(right).toSet
+            val common = lhsSet.intersect(rhsSet)
+
+            (lhsSet.diff(common).reduceOption(And) ++ rhsSet.diff(common).reduceOption(And))
+              .reduceOption(Or)
+              .map(_ :: common.toList)
+              .getOrElse(common.toList)
+              .reduce(And)
         }
 
       case not @ Not(exp) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/61a99f6a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
new file mode 100644
index 0000000..906300d
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+// For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+
+class NormalizeFiltersSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Seq(
+      Batch("AnalysisNodes", Once,
+        EliminateAnalysisOperators),
+      Batch("NormalizeFilters", FixedPoint(100),
+        BooleanSimplification,
+        SimplifyFilters))
+  }
+
+  val relation = LocalRelation('a.int, 'b.int, 'c.string)
+
+  def checkExpression(original: Expression, expected: Expression): Unit = {
+    val actual = Optimize(relation.where(original)).collect { case f: Filter => f.condition }.head
+    val result = (actual, expected) match {
+      case (And(l1, r1), And(l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
+      case (Or (l1, r1), Or (l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
+      case (lhs, rhs) => lhs fastEquals rhs
+    }
+
+    assert(result, s"$actual isn't equivalent to $expected")
+  }
+
+  test("a && a => a") {
+    checkExpression('a === 1 && 'a === 1, 'a === 1)
+    checkExpression('a === 1 && 'a === 1 && 'a === 1, 'a === 1)
+  }
+
+  test("a || a => a") {
+    checkExpression('a === 1 || 'a === 1, 'a === 1)
+    checkExpression('a === 1 || 'a === 1 || 'a === 1, 'a === 1)
+  }
+
+  test("(a && b) || (a && c) => a && (b || c)") {
+    checkExpression(
+      ('a === 1 && 'a < 10) || ('a > 2 && 'a === 1),
+      ('a === 1) && ('a < 10 || 'a > 2))
+
+    checkExpression(
+      ('a < 1 && 'b > 2 && 'c.isNull) || ('a < 1 && 'c === "hello" && 'b > 2),
+      ('c.isNull || 'c === "hello") && 'a < 1 && 'b > 2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/61a99f6a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 82afa31..1915c25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -105,7 +105,9 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be
 
     test(query) {
       val schemaRdd = sql(query)
-      assertResult(expectedQueryResult.toArray, "Wrong query result") {
+      val queryExecution = schemaRdd.queryExecution
+
+      assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") {
         schemaRdd.collect().map(_.head).toArray
       }
 
@@ -113,8 +115,10 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be
         case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value)
       }.head
 
-      assert(readBatches === expectedReadBatches, "Wrong number of read batches")
-      assert(readPartitions === expectedReadPartitions, "Wrong number of read partitions")
+      assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution")
+      assert(
+        readPartitions === expectedReadPartitions,
+        s"Wrong number of read partitions: $queryExecution")
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org