You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/09 22:09:14 UTC

git commit: [SPARK-3711][SQL] Optimize where in clause filter queries

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 09d6a81ad -> 18ef22ab7


[SPARK-3711][SQL] Optimize where in clause filter queries

The In case class is replaced by a InSet class in case all the filters are literals, which uses a hashset instead of Sequence, thereby giving significant performance improvement (earlier the seq was using a worst case linear match (exists method) since expressions were assumed in the filter list) . Maximum improvement should be visible in case small percentage of large data matches the filter list.

Author: Yash Datta <Ya...@guavus.com>

Closes #2561 from saucam/branch-1.1 and squashes the following commits:

4bf2d19 [Yash Datta] SPARK-3711: 1. Fix code style and import order             2. Fix optimization condition             3. Add tests for null in filter list             4. Add test case that optimization is not triggered in case of attributes in filter list
afedbcd [Yash Datta] SPARK-3711: 1. Add test cases for InSet class in ExpressionEvaluationSuite             2. Add class OptimizedInSuite on the lines of ConstantFoldingSuite, for the optimized In clause
0fc902f [Yash Datta] SPARK-3711: UnaryMinus will be handled by constantFolding
bd84c67 [Yash Datta] SPARK-3711: Incorporate review comments. Move optimization of In clause to Optimizer.scala by adding a rule. Add appropriate comments
430f5d1 [Yash Datta] SPARK-3711: Optimize the filter list in case of negative values as well
bee98aa [Yash Datta] SPARK-3711: Optimize where in clause filter queries


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18ef22ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18ef22ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18ef22ab

Branch: refs/heads/branch-1.1
Commit: 18ef22ab7d74903a49c92859bca4c2911a0b0a22
Parents: 09d6a81
Author: Yash Datta <Ya...@guavus.com>
Authored: Thu Oct 9 12:59:14 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Oct 9 13:08:35 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   | 19 ++++-
 .../sql/catalyst/optimizer/Optimizer.scala      | 18 ++++-
 .../expressions/ExpressionEvaluationSuite.scala | 21 ++++++
 .../catalyst/optimizer/OptimizeInSuite.scala    | 76 ++++++++++++++++++++
 4 files changed, 132 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18ef22ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 1313ccd..4f0ba84 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.collection.immutable.HashSet
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.BooleanType
 
-
 object InterpretedPredicate {
   def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
     apply(BindReferences.bindReference(expression, inputSchema))
@@ -95,6 +95,23 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
   }
 }
 
+/**
+ * Optimized version of In clause, when all filter values of In clause are
+ * static.
+ */
+case class InSet(value: Expression, hset: HashSet[Any], child: Seq[Expression]) 
+  extends Predicate {
+
+  def children = child
+
+  def nullable = true // TODO: Figure out correct nullability semantics of IN.
+  override def toString = s"$value INSET ${hset.mkString("(", ",", ")")}"
+
+  override def eval(input: Row): Any = {
+    hset.contains(value.eval(input))
+  }
+}
+
 case class And(left: Expression, right: Expression) extends BinaryPredicate {
   def symbol = "&&"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/18ef22ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ddd4b37..ba9eff7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import scala.collection.immutable.HashSet
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.FullOuter
@@ -38,7 +39,8 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
       BooleanSimplification,
       SimplifyFilters,
       SimplifyCasts,
-      SimplifyCaseConversionExpressions) ::
+      SimplifyCaseConversionExpressions,
+      OptimizeIn) ::
     Batch("Filter Pushdown", FixedPoint(100),
       CombineFilters,
       PushPredicateThroughProject,
@@ -226,6 +228,20 @@ object ConstantFolding extends Rule[LogicalPlan] {
 }
 
 /**
+ * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
+ * which is much faster
+ */
+object OptimizeIn extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case q: LogicalPlan => q transformExpressionsDown {
+      case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(null))
+          InSet(v, HashSet() ++ hSet, v +: list)
+    }
+  }
+}
+
+/**
  * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
  * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
  * is only safe when evaluations of expressions does not result in side effects.

http://git-wip-us.apache.org/repos/asf/spark/blob/18ef22ab/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index f1df817..75eef39 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -19,10 +19,13 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.Timestamp
 
+import scala.collection.immutable.HashSet
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.sql.catalyst.types._
 
+
 /* Implicit conversions */
 import org.apache.spark.sql.catalyst.dsl.expressions._
 
@@ -136,6 +139,24 @@ class ExpressionEvaluationSuite extends FunSuite {
     checkEvaluation(In(Literal(1), Seq(Literal(1), Literal(2))) && In(Literal(2), Seq(Literal(1), Literal(2))), true)
   }
 
+  test("INSET") {
+    val hS = HashSet[Any]() + 1 + 2
+    val nS = HashSet[Any]() + 1 + 2 + null
+    val one = Literal(1)
+    val two = Literal(2)
+    val three = Literal(3)
+    val nl = Literal(null)
+    val s = Seq(one, two)
+    val nullS = Seq(one, two, null)
+    checkEvaluation(InSet(one, hS, one +: s), true)
+    checkEvaluation(InSet(two, hS, two +: s), true)
+    checkEvaluation(InSet(two, nS, two +: nullS), true)
+    checkEvaluation(InSet(nl, nS, nl +: nullS), true)
+    checkEvaluation(InSet(three, hS, three +: s), false)
+    checkEvaluation(InSet(three, nS, three +: nullS), false)
+    checkEvaluation(InSet(one, hS, one +: s) && InSet(two, hS, two +: s), true)
+  }
+ 
   test("MaxOf") {
     checkEvaluation(MaxOf(1, 2), 2)
     checkEvaluation(MaxOf(2, 1), 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/18ef22ab/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
new file mode 100644
index 0000000..97a78ec
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.immutable.HashSet
+import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.types._
+
+// For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+class OptimizeInSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("AnalysisNodes", Once,
+        EliminateAnalysisOperators) ::
+      Batch("ConstantFolding", Once,
+        ConstantFolding,
+        BooleanSimplification,
+        OptimizeIn) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+  test("OptimizedIn test: In clause optimized to InSet") {
+    val originalQuery =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Seq(Literal(1),Literal(2))))
+        .analyze
+
+    val optimized = Optimize(originalQuery.analyze)
+    val correctAnswer =
+      testRelation
+        .where(InSet(UnresolvedAttribute("a"), HashSet[Any]()+1+2, 
+            UnresolvedAttribute("a") +: Seq(Literal(1),Literal(2))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+  
+  test("OptimizedIn test: In clause not optimized in case filter has attributes") {
+    val originalQuery =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Seq(Literal(1),Literal(2), UnresolvedAttribute("b"))))
+        .analyze
+
+    val optimized = Optimize(originalQuery.analyze)
+    val correctAnswer =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Seq(Literal(1),Literal(2), UnresolvedAttribute("b"))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org