You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/09 00:40:49 UTC

spark git commit: [SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks

Repository: spark
Updated Branches:
  refs/heads/master 1e2884059 -> e430614ea


[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks

## What changes were proposed in this pull request?

If a filter predicate or a join condition consists of `IsNotNull` checks, we should reorder these checks such that these non-nullability checks are evaluated before the rest of the predicates.

For e.g., if a filter predicate is of the form `a > 5 && isNotNull(b)`, we should rewrite this as `isNotNull(b) && a > 5` during physical plan generation.

## How was this patch tested?

new unit tests that verify the physical plan for both filters and joins in `ReorderedPredicateSuite`

Author: Sameer Agarwal <sa...@databricks.com>

Closes #11511 from sameeragarwal/reorder-isnotnull.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e430614e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e430614e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e430614e

Branch: refs/heads/master
Commit: e430614eae53c8864b31a1dc64db83e27100d1d9
Parents: 1e28840
Author: Sameer Agarwal <sa...@databricks.com>
Authored: Tue Mar 8 15:40:45 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Mar 8 15:40:45 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/planning/QueryPlanner.scala    |  24 ++++-
 .../spark/sql/execution/SparkStrategies.scala   |  37 ++++---
 .../sql/execution/ReorderedPredicateSuite.scala | 103 +++++++++++++++++++
 3 files changed, 150 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e430614e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 56a3dd0..1e4523e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.catalyst.planning
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
@@ -26,8 +28,28 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * be used for execution. If this strategy does not apply to the give logical operation then an
  * empty list should be returned.
  */
-abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
+abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]]
+  extends PredicateHelper with Logging {
+
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
+
+  // Attempts to re-order the individual conjunctive predicates in an expression to short circuit
+  // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others.
+  protected def reorderPredicates(expr: Expression): Expression = {
+    splitConjunctivePredicates(expr)
+      .sortWith((x, _) => x.isInstanceOf[IsNotNull])
+      .reduce(And)
+  }
+
+  // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins
+  protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = {
+    exprOpt match {
+      case Some(expr) =>
+        Option(reorderPredicates(expr))
+      case None =>
+        exprOpt
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e430614e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index debd04a..36fea4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,11 +66,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case ExtractEquiJoinKeys(
              LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
         joins.BroadcastLeftSemiJoinHash(
-          leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
+          leftKeys, rightKeys, planLater(left), planLater(right),
+          reorderPredicates(condition)) :: Nil
       // Find left semi joins where at least some predicates can be evaluated by matching join keys
       case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
         joins.LeftSemiJoinHash(
-          leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
+          leftKeys, rightKeys, planLater(left), planLater(right),
+          reorderPredicates(condition)) :: Nil
       case _ => Nil
     }
   }
@@ -111,33 +113,39 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right)))
+          leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition),
+          planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right)))
+          leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left),
+          planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
         if RowOrdering.isOrderable(leftKeys) =>
         joins.SortMergeJoin(
-          leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil
+          leftKeys, rightKeys, reorderPredicates(condition), planLater(left),
+          planLater(right)) :: Nil
 
       // --- Outer joins --------------------------------------------------------------------------
 
       case ExtractEquiJoinKeys(
           LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
+          leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition),
+          planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(
           RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
+          leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition),
+          planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if RowOrdering.isOrderable(leftKeys) =>
         joins.SortMergeOuterJoin(
-          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
+          leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left),
+          planLater(right)) :: Nil
 
       // --- Cases where this strategy does not apply ---------------------------------------------
 
@@ -252,10 +260,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) =>
         execution.joins.BroadcastNestedLoopJoin(
-          planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil
+          planLater(left), planLater(right), joins.BuildLeft, j.joinType,
+          reorderPredicates(condition)) :: Nil
       case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) =>
         execution.joins.BroadcastNestedLoopJoin(
-          planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil
+          planLater(left), planLater(right), joins.BuildRight, j.joinType,
+          reorderPredicates(condition)) :: Nil
       case _ => Nil
     }
   }
@@ -265,7 +275,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.Join(left, right, Inner, None) =>
         execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
       case logical.Join(left, right, Inner, Some(condition)) =>
-        execution.Filter(condition,
+        execution.Filter(reorderPredicates(condition),
           execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
       case _ => Nil
     }
@@ -282,7 +292,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           }
         // This join could be very slow or even hang forever
         joins.BroadcastNestedLoopJoin(
-          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
+          planLater(left), planLater(right), buildSide, joinType,
+          reorderPredicates(condition)) :: Nil
       case _ => Nil
     }
   }
@@ -341,7 +352,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.Project(projectList, child) =>
         execution.Project(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
-        execution.Filter(condition, planLater(child)) :: Nil
+        execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil
       case e @ logical.Expand(_, _, child) =>
         execution.Expand(e.projections, e.output, planLater(child)) :: Nil
       case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e430614e/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
new file mode 100644
index 0000000..dd0e438
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.Join
+import org.apache.spark.sql.execution
+import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {
+
+  setupTestData()
+
+  // Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators
+  // and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained
+  private def verifyStableOrder(before: Expression, after: Expression): Unit = {
+    val oldPredicates = splitConjunctivePredicates(before)
+    splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
+      // Verify IsNotNull operator ordering
+      assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])
+
+      // Verify stable sort order
+      if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
+        (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
+        assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
+      }
+    }
+  }
+
+  test("null ordering in filter predicates") {
+    val query = sql(
+      """
+        |SELECT * from testData
+        |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
+      """.stripMargin)
+      .queryExecution
+
+    val logicalPlan = query.optimizedPlan
+    val physicalPlan = query.sparkPlan
+    assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
+    assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)
+
+    val logicalCondition = logicalPlan.collect {
+      case logical.Filter(condition, _) =>
+        condition
+    }.head
+
+    val physicalCondition = physicalPlan.collect {
+      case Filter(condition, _) =>
+        condition
+    }.head
+
+    verifyStableOrder(logicalCondition, physicalCondition)
+  }
+
+  test("null ordering in join predicates") {
+    sqlContext.cacheManager.clearCache()
+    val query = sql(
+      """
+        |SELECT * FROM testData t1
+        |LEFT SEMI JOIN testData t2
+        |ON t1.key = t2.key
+        |AND t1.key + t2.key != 5
+        |AND CONCAT(t1.value, t2.value) IS NOT NULL
+      """.stripMargin)
+      .queryExecution
+
+    val logicalPlan = query.optimizedPlan
+    val physicalPlan = query.sparkPlan
+    assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
+    assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)
+
+    val logicalCondition = logicalPlan.collect {
+      case Join(_, _, _, condition) =>
+        condition.get
+    }.head
+
+    val physicalCondition = physicalPlan.collect {
+      case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
+        conditionOpt.get
+    }.head
+
+    verifyStableOrder(logicalCondition, physicalCondition)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org