You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/20 07:27:16 UTC

spark git commit: [SPARK-12594] [SQL] Outer Join Elimination by Filter Conditions

Repository: spark
Updated Branches:
  refs/heads/master 983fa2d62 -> ec7a1d6e4


[SPARK-12594] [SQL] Outer Join Elimination by Filter Conditions

Conversion of outer joins, if the predicates in filter conditions can restrict the result sets so that all null-supplying rows are eliminated.

- `full outer` -> `inner` if both sides have such predicates
- `left outer` -> `inner` if the right side has such predicates
- `right outer` -> `inner` if the left side has such predicates
- `full outer` -> `left outer` if only the left side has such predicates
- `full outer` -> `right outer` if only the right side has such predicates

If applicable, this can greatly improve the performance, since outer join is much slower than inner join, full outer join is much slower than left/right outer join.

The original PR is https://github.com/apache/spark/pull/10542

Author: gatorsmile <ga...@gmail.com>
Author: xiaoli <li...@gmail.com>
Author: Xiao Li <xi...@Xiaos-MacBook-Pro.local>

Closes #10567 from gatorsmile/outerJoinEliminationByFilterCond.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec7a1d6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec7a1d6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec7a1d6e

Branch: refs/heads/master
Commit: ec7a1d6e425509f2472c3ae9497c7da796ce8129
Parents: 983fa2d
Author: gatorsmile <ga...@gmail.com>
Authored: Fri Feb 19 22:27:10 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Feb 19 22:27:10 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  59 +++++-
 .../optimizer/OuterJoinEliminationSuite.scala   | 195 +++++++++++++++++++
 .../apache/spark/sql/DataFrameJoinSuite.scala   |  48 +++++
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   2 +-
 4 files changed, 302 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec7a1d6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 55c168d..b7d8d93 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -21,8 +21,8 @@ import scala.collection.immutable.HashSet
 
 import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
 import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -62,6 +62,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       SetOperationPushDown,
       SamplePushDown,
       ReorderJoin,
+      OuterJoinElimination,
       PushPredicateThroughJoin,
       PushPredicateThroughProject,
       PushPredicateThroughGenerate,
@@ -932,6 +933,62 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
+ * Elimination of outer joins, if the predicates can restrict the result sets so that
+ * all null-supplying rows are eliminated
+ *
+ * - full outer -> inner if both sides have such predicates
+ * - left outer -> inner if the right side has such predicates
+ * - right outer -> inner if the left side has such predicates
+ * - full outer -> left outer if only the left side has such predicates
+ * - full outer -> right outer if only the right side has such predicates
+ *
+ * This rule should be executed before pushing down the Filter
+ */
+object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
+
+  /**
+   * Returns whether the expression returns null or false when all inputs are nulls.
+   */
+  private def canFilterOutNull(e: Expression): Boolean = {
+    if (!e.deterministic) return false
+    val attributes = e.references.toSeq
+    val emptyRow = new GenericInternalRow(attributes.length)
+    val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+    v == null || v == false
+  }
+
+  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
+    val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
+    val leftConditions = splitConjunctiveConditions
+      .filter(_.references.subsetOf(join.left.outputSet))
+    val rightConditions = splitConjunctiveConditions
+      .filter(_.references.subsetOf(join.right.outputSet))
+
+    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
+      filter.constraints.filter(_.isInstanceOf[IsNotNull])
+        .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
+    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
+      filter.constraints.filter(_.isInstanceOf[IsNotNull])
+        .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
+
+    join.joinType match {
+      case RightOuter if leftHasNonNullPredicate => Inner
+      case LeftOuter if rightHasNonNullPredicate => Inner
+      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
+      case FullOuter if leftHasNonNullPredicate => LeftOuter
+      case FullOuter if rightHasNonNullPredicate => RightOuter
+      case o => o
+    }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
+      val newJoinType = buildNewJoinType(f, j)
+      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
+  }
+}
+
+/**
  * Pushes down [[Filter]] operators where the `condition` can be
  * evaluated using only the attributes of the left or right side of a join.  Other
  * [[Filter]] conditions are moved into the `condition` of the [[Join]].

http://git-wip-us.apache.org/repos/asf/spark/blob/ec7a1d6e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
new file mode 100644
index 0000000..a1dc836
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class OuterJoinEliminationSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateSubQueries) ::
+      Batch("Outer Join Elimination", Once,
+        OuterJoinElimination,
+        PushPredicateThroughJoin) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+  val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int)
+
+  test("joins: full outer to inner") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+        .where("x.b".attr >= 1 && "y.d".attr >= 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('b >= 1)
+    val right = testRelation1.where('d >= 2)
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: full outer to right") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where('d > 2)
+    val correctAnswer =
+      left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: full outer to left") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('a <=> 2)
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: right to inner") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('b > 2)
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: left to inner") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where("y.e".attr.isNotNull)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where('e.isNotNull)
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // evaluating if mixed OR and NOT expressions can eliminate all null-supplying rows
+  test("joins: left to inner with complicated filter predicates #1") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where(!'e.isNull || ('d.isNotNull && 'f.isNull))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where(!'e.isNull || ('d.isNotNull && 'f.isNull))
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // eval(emptyRow) of 'e.in(1, 2) will return null instead of false
+  test("joins: left to inner with complicated filter predicates #2") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where('e.in(1, 2))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where('e.in(1, 2))
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // evaluating if mixed OR and AND expressions can eliminate all null-supplying rows
+  test("joins: left to inner with complicated filter predicates #3") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // evaluating if the expressions that have both left and right attributes
+  // can eliminate all null-supplying rows
+  // FULL OUTER => INNER
+  test("joins: left to inner with complicated filter predicates #4") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+        .where("x.b".attr + 3 === "y.e".attr)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, Inner, Option("b".attr + 3 === "e".attr && "a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ec7a1d6e/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index a5e5f15..067a62d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoin
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -156,4 +158,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       assert(df1.join(broadcast(pf1)).count() === 4)
     }
   }
+
+  test("join - outer join conversion") {
+    val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
+    val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
+
+    // outer -> left
+    val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3)
+    assert(outerJoin2Left.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, LeftOuter, _) => j }.size === 1)
+    checkAnswer(
+      outerJoin2Left,
+      Row(3, 4, "3", null, null, null) :: Nil)
+
+    // outer -> right
+    val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5)
+    assert(outerJoin2Right.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, RightOuter, _) => j }.size === 1)
+    checkAnswer(
+      outerJoin2Right,
+      Row(null, null, null, 5, 6, "5") :: Nil)
+
+    // outer -> inner
+    val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer").
+      where($"a.int" === 1 && $"b.int2" === 3)
+    assert(outerJoin2Inner.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, Inner, _) => j }.size === 1)
+    checkAnswer(
+      outerJoin2Inner,
+      Row(1, 2, "1", 1, 3, "1") :: Nil)
+
+    // right -> inner
+    val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1)
+    assert(rightJoin2Inner.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, Inner, _) => j }.size === 1)
+    checkAnswer(
+      rightJoin2Inner,
+      Row(1, 2, "1", 1, 3, "1") :: Nil)
+
+    // left -> inner
+    val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3)
+    assert(leftJoin2Inner.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, Inner, _) => j }.size === 1)
+    checkAnswer(
+      leftJoin2Inner,
+      Row(1, 2, "1", 1, 3, "1") :: Nil)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec7a1d6e/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 92ff7e7..8f2a0c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
       ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]),
       ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
-        classOf[SortMergeOuterJoin]),
+        classOf[SortMergeJoin]), // converted from Right Outer to Inner
       ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
         classOf[SortMergeOuterJoin]),
       ("SELECT * FROM testData full outer join testData2 ON key = a",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org