You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/01/16 23:01:28 UTC

spark git commit: [SPARK-4937][SQL] Adding optimization to simplify the And, Or condition in spark sql

Repository: spark
Updated Branches:
  refs/heads/master fd3a8a1d1 -> ee1c1f3a0


[SPARK-4937][SQL] Adding optimization to simplify the  And, Or condition in spark sql

Adding optimization to simplify the And/Or condition in spark sql.

There are two kinds of Optimization
1 Numeric condition optimization, such as:
a < 3 && a > 5 ---- False
a < 1 || a > 0 ---- True
a > 3 && a > 5 => a > 5
(a < 2 || b > 5) && a < 2 => a < 2

2 optimizing the some query from a cartesian product into equi-join, such as this sql (one of hive-testbench):
```
select
sum(l_extendedprice* (1 - l_discount)) as revenue
from
lineitem,
part
where
(
p_partkey = l_partkey
and p_brand = 'Brand#32'
and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
and l_quantity >= 7 and l_quantity <= 7 + 10
and p_size between 1 and 5
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
)
or
(
p_partkey = l_partkey
and p_brand = 'Brand#35'
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
and l_quantity >= 15 and l_quantity <= 15 + 10
and p_size between 1 and 10
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
)
or
(
p_partkey = l_partkey
and p_brand = 'Brand#24'
and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
and l_quantity >= 26 and l_quantity <= 26 + 10
and p_size between 1 and 15
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
)
```
It has a repeated expression in Or, so we can optimize it by ``` (a && b) || (a && c) = a && (b || c)```
Before optimization, this sql hang in my locally test, and the physical plan is:
![image](https://cloud.githubusercontent.com/assets/7018048/5539175/31cf38e8-8af9-11e4-95e3-336f9b3da4a4.png)

After optimization, this sql run successfully in 20+ seconds, and its physical plan is:
![image](https://cloud.githubusercontent.com/assets/7018048/5539176/39a558e0-8af9-11e4-912b-93de94b20075.png)

This PR focus on the second optimization and some simple ones of the first. For complex Numeric condition optimization, I will make a follow up PR.

Author: scwf <wa...@huawei.com>
Author: wangfei <wa...@huawei.com>

Closes #3778 from scwf/filter1 and squashes the following commits:

58bcbc2 [scwf] minor format fix
9570211 [scwf] conflicts fix
527e6ce [scwf] minor comment improvements
5c6f134 [scwf] remove numeric optimizations and move to BooleanSimplification
546a82b [wangfei] style fix
825fa69 [wangfei] adding more tests
a001e8c [wangfei] revert pom changes
32a595b [scwf] improvement and test fix
e99a26c [wangfei] refactory And/Or optimization to make it more readable and clean


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee1c1f3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee1c1f3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee1c1f3a

Branch: refs/heads/master
Commit: ee1c1f3a04dfe80843432e349f01178e47f02443
Parents: fd3a8a1
Author: scwf <wa...@huawei.com>
Authored: Fri Jan 16 14:01:22 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Jan 16 14:01:22 2015 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 57 ++++++++----
 .../optimizer/BooleanSimplificationSuite.scala  | 92 ++++++++++++++++++++
 .../optimizer/NormalizeFiltersSuite.scala       | 72 ---------------
 3 files changed, 131 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee1c1f3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d4a4c35..f3acb70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -294,13 +294,10 @@ object OptimizeIn extends Rule[LogicalPlan] {
 
 /**
  * Simplifies boolean expressions:
- *
  * 1. Simplifies expressions whose answer can be determined without evaluating both sides.
  * 2. Eliminates / extracts common factors.
- * 3. Removes `Not` operator.
- *
- * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
- * is only safe when evaluations of expressions does not result in side effects.
+ * 3. Merge same expressions
+ * 4. Removes `Not` operator.
  */
 object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -311,9 +308,26 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
           case (l, Literal(true, BooleanType)) => l
           case (Literal(false, BooleanType), _) => Literal(false)
           case (_, Literal(false, BooleanType)) => Literal(false)
-          // a && a && a ... => a
-          case _ if splitConjunctivePredicates(and).distinct.size == 1 => left
-          case _ => and
+          // a && a => a
+          case (l, r) if l fastEquals r => l
+          case (_, _) =>
+            val lhsSet = splitDisjunctivePredicates(left).toSet
+            val rhsSet = splitDisjunctivePredicates(right).toSet
+            val common = lhsSet.intersect(rhsSet)
+            val ldiff = lhsSet.diff(common)
+            val rdiff = rhsSet.diff(common)
+            if (ldiff.size == 0 || rdiff.size == 0) {
+              // a && (a || b)
+              common.reduce(Or)
+            } else {
+              // (a || b || c || ...) && (a || b || d || ...) && (a || b || e || ...) ... =>
+              // (a || b) || ((c || ...) && (f || ...) && (e || ...) && ...)
+              (ldiff.reduceOption(Or) ++ rdiff.reduceOption(Or))
+                .reduceOption(And)
+                .map(_ :: common.toList)
+                .getOrElse(common.toList)
+                .reduce(Or)
+            }
         }
 
       case or @ Or(left, right) =>
@@ -322,19 +336,26 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
           case (_, Literal(true, BooleanType)) => Literal(true)
           case (Literal(false, BooleanType), r) => r
           case (l, Literal(false, BooleanType)) => l
-          // a || a || a ... => a
-          case _ if splitDisjunctivePredicates(or).distinct.size == 1 => left
-          // (a && b && c && ...) || (a && b && d && ...) => a && b && (c || d || ...)
-          case _ =>
+          // a || a => a
+          case (l, r) if l fastEquals r => l
+          case (_, _) =>
             val lhsSet = splitConjunctivePredicates(left).toSet
             val rhsSet = splitConjunctivePredicates(right).toSet
             val common = lhsSet.intersect(rhsSet)
-
-            (lhsSet.diff(common).reduceOption(And) ++ rhsSet.diff(common).reduceOption(And))
-              .reduceOption(Or)
-              .map(_ :: common.toList)
-              .getOrElse(common.toList)
-              .reduce(And)
+            val ldiff = lhsSet.diff(common)
+            val rdiff = rhsSet.diff(common)
+            if ( ldiff.size == 0 || rdiff.size == 0) {
+              // a || (b && a)
+              common.reduce(And)
+            } else {
+              // (a && b && c && ...) || (a && b && d && ...) || (a && b && e && ...) ... =>
+              // a && b && ((c && ...) || (d && ...) || (e && ...) || ...)
+              (ldiff.reduceOption(And) ++ rdiff.reduceOption(And))
+                .reduceOption(Or)
+                .map(_ :: common.toList)
+                .getOrElse(common.toList)
+                .reduce(And)
+            }
         }
 
       case not @ Not(exp) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ee1c1f3a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
new file mode 100644
index 0000000..a0863da
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.expressions.{Literal, Expression}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+class BooleanSimplificationSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("AnalysisNodes", Once,
+        EliminateAnalysisOperators) ::
+      Batch("Constant Folding", FixedPoint(50),
+        NullPropagation,
+        ConstantFolding,
+        BooleanSimplification,
+        SimplifyFilters) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)
+
+  def checkCondition(originCondition: Expression, optimizedCondition: Expression): Unit = {
+    val originQuery = testRelation.where(originCondition).analyze
+    val optimized = Optimize(originQuery)
+    val expected = testRelation.where(optimizedCondition).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("a && a => a") {
+    checkCondition(Literal(1) < 'a && Literal(1) < 'a, Literal(1) < 'a)
+    checkCondition(Literal(1) < 'a && Literal(1) < 'a && Literal(1) < 'a, Literal(1) < 'a)
+  }
+
+  test("a || a => a") {
+    checkCondition(Literal(1) < 'a || Literal(1) < 'a, Literal(1) < 'a)
+    checkCondition(Literal(1) < 'a || Literal(1) < 'a || Literal(1) < 'a, Literal(1) < 'a)
+  }
+
+  test("(a && b && c && ...) || (a && b && d && ...) || (a && b && e && ...) ...") {
+    checkCondition('b > 3 || 'c > 5, 'b > 3 || 'c > 5)
+
+    checkCondition(('a < 2 && 'a > 3 && 'b > 5) || 'a < 2,  'a < 2)
+
+    checkCondition('a < 2 || ('a < 2 && 'a > 3 && 'b > 5),  'a < 2)
+
+    val input = ('a === 'b && 'b > 3 && 'c > 2) ||
+      ('a === 'b && 'c < 1 && 'a === 5) ||
+      ('a === 'b && 'b < 5 && 'a > 1)
+
+    val expected =
+      (((('b > 3) && ('c > 2)) ||
+        (('c < 1) && ('a === 5))) ||
+        (('b < 5) && ('a > 1))) && ('a === 'b)
+    checkCondition(input, expected)
+
+  }
+
+  test("(a || b || c || ...) && (a || b || d || ...) && (a || b || e || ...) ...") {
+    checkCondition('b > 3 && 'c > 5, 'b > 3 && 'c > 5)
+
+    checkCondition(('a < 2 || 'a > 3 || 'b > 5) && 'a < 2, 'a < 2)
+
+    checkCondition('a < 2 && ('a < 2 || 'a > 3 || 'b > 5) , 'a < 2)
+
+    checkCondition(('a < 2 || 'b > 3) && ('a < 2 || 'c > 5), ('b > 3 && 'c > 5) || 'a < 2)
+
+    var input: Expression = ('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5)
+    var expected: Expression = ('b > 3 && 'a > 3 && 'a < 5) || 'a === 'b
+    checkCondition(input, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ee1c1f3a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
deleted file mode 100644
index 906300d..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or}
-import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
-// For implicit conversions
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-
-class NormalizeFiltersSuite extends PlanTest {
-  object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches = Seq(
-      Batch("AnalysisNodes", Once,
-        EliminateAnalysisOperators),
-      Batch("NormalizeFilters", FixedPoint(100),
-        BooleanSimplification,
-        SimplifyFilters))
-  }
-
-  val relation = LocalRelation('a.int, 'b.int, 'c.string)
-
-  def checkExpression(original: Expression, expected: Expression): Unit = {
-    val actual = Optimize(relation.where(original)).collect { case f: Filter => f.condition }.head
-    val result = (actual, expected) match {
-      case (And(l1, r1), And(l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
-      case (Or (l1, r1), Or (l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
-      case (lhs, rhs) => lhs fastEquals rhs
-    }
-
-    assert(result, s"$actual isn't equivalent to $expected")
-  }
-
-  test("a && a => a") {
-    checkExpression('a === 1 && 'a === 1, 'a === 1)
-    checkExpression('a === 1 && 'a === 1 && 'a === 1, 'a === 1)
-  }
-
-  test("a || a => a") {
-    checkExpression('a === 1 || 'a === 1, 'a === 1)
-    checkExpression('a === 1 || 'a === 1 || 'a === 1, 'a === 1)
-  }
-
-  test("(a && b) || (a && c) => a && (b || c)") {
-    checkExpression(
-      ('a === 1 && 'a < 10) || ('a > 2 && 'a === 1),
-      ('a === 1) && ('a < 10 || 'a > 2))
-
-    checkExpression(
-      ('a < 1 && 'b > 2 && 'c.isNull) || ('a < 1 && 'c === "hello" && 'b > 2),
-      ('c.isNull || 'c === "hello") && 'a < 1 && 'b > 2)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org