You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/08/27 07:36:20 UTC

spark git commit: [SPARK-17274][SQL] Move join optimizer rules into a separate file

Repository: spark
Updated Branches:
  refs/heads/master 5aad4509c -> 718b6bad2


[SPARK-17274][SQL] Move join optimizer rules into a separate file

## What changes were proposed in this pull request?
As part of breaking Optimizer.scala apart, this patch moves various join rules into a single file.

## How was this patch tested?
This should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #14846 from rxin/SPARK-17274.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/718b6bad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/718b6bad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/718b6bad

Branch: refs/heads/master
Commit: 718b6bad2d698b76be6906d51da13626e9f3890e
Parents: 5aad450
Author: Reynold Xin <rx...@databricks.com>
Authored: Sat Aug 27 00:36:18 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Aug 27 00:36:18 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 106 ---------------
 .../spark/sql/catalyst/optimizer/joins.scala    | 134 +++++++++++++++++++
 2 files changed, 134 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/718b6bad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 17cab18..7617d34 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -800,112 +800,6 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
- * one condition.
- *
- * The order of joins will not be changed if all of them already have at least one condition.
- */
-object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
-
-  /**
-   * Join a list of plans together and push down the conditions into them.
-   *
-   * The joined plan are picked from left to right, prefer those has at least one join condition.
-   *
-   * @param input a list of LogicalPlans to join.
-   * @param conditions a list of condition for join.
-   */
-  @tailrec
-  def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
-    assert(input.size >= 2)
-    if (input.size == 2) {
-      val (joinConditions, others) = conditions.partition(
-        e => !SubqueryExpression.hasCorrelatedSubquery(e))
-      val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
-      if (others.nonEmpty) {
-        Filter(others.reduceLeft(And), join)
-      } else {
-        join
-      }
-    } else {
-      val left :: rest = input.toList
-      // find out the first join that have at least one join condition
-      val conditionalJoin = rest.find { plan =>
-        val refs = left.outputSet ++ plan.outputSet
-        conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
-          .exists(_.references.subsetOf(refs))
-      }
-      // pick the next one if no condition left
-      val right = conditionalJoin.getOrElse(rest.head)
-
-      val joinedRefs = left.outputSet ++ right.outputSet
-      val (joinConditions, others) = conditions.partition(
-        e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
-      val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
-
-      // should not have reference to same logical plan
-      createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
-    }
-  }
-
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case j @ ExtractFiltersAndInnerJoins(input, conditions)
-        if input.size > 2 && conditions.nonEmpty =>
-      createOrderedJoin(input, conditions)
-  }
-}
-
-/**
- * Elimination of outer joins, if the predicates can restrict the result sets so that
- * all null-supplying rows are eliminated
- *
- * - full outer -> inner if both sides have such predicates
- * - left outer -> inner if the right side has such predicates
- * - right outer -> inner if the left side has such predicates
- * - full outer -> left outer if only the left side has such predicates
- * - full outer -> right outer if only the right side has such predicates
- *
- * This rule should be executed before pushing down the Filter
- */
-object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
-
-  /**
-   * Returns whether the expression returns null or false when all inputs are nulls.
-   */
-  private def canFilterOutNull(e: Expression): Boolean = {
-    if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
-    val attributes = e.references.toSeq
-    val emptyRow = new GenericInternalRow(attributes.length)
-    val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
-    v == null || v == false
-  }
-
-  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
-    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
-    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
-    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
-
-    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
-    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
-
-    join.joinType match {
-      case RightOuter if leftHasNonNullPredicate => Inner
-      case LeftOuter if rightHasNonNullPredicate => Inner
-      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
-      case FullOuter if leftHasNonNullPredicate => LeftOuter
-      case FullOuter if rightHasNonNullPredicate => RightOuter
-      case o => o
-    }
-  }
-
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
-      val newJoinType = buildNewJoinType(f, j)
-      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
-  }
-}
-
-/**
  * Pushes down [[Filter]] operators where the `condition` can be
  * evaluated using only the attributes of the left or right side of a join.  Other
  * [[Filter]] conditions are moved into the `condition` of the [[Join]].

http://git-wip-us.apache.org/repos/asf/spark/blob/718b6bad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
new file mode 100644
index 0000000..158ad3d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+
+/**
+ * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
+ * one condition.
+ *
+ * The order of joins will not be changed if all of them already have at least one condition.
+ */
+object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+  /**
+   * Join a list of plans together and push down the conditions into them.
+   *
+   * The joined plan are picked from left to right, prefer those has at least one join condition.
+   *
+   * @param input a list of LogicalPlans to join.
+   * @param conditions a list of condition for join.
+   */
+  @tailrec
+  def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
+    assert(input.size >= 2)
+    if (input.size == 2) {
+      val (joinConditions, others) = conditions.partition(
+        e => !SubqueryExpression.hasCorrelatedSubquery(e))
+      val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
+      if (others.nonEmpty) {
+        Filter(others.reduceLeft(And), join)
+      } else {
+        join
+      }
+    } else {
+      val left :: rest = input.toList
+      // find out the first join that have at least one join condition
+      val conditionalJoin = rest.find { plan =>
+        val refs = left.outputSet ++ plan.outputSet
+        conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
+          .exists(_.references.subsetOf(refs))
+      }
+      // pick the next one if no condition left
+      val right = conditionalJoin.getOrElse(rest.head)
+
+      val joinedRefs = left.outputSet ++ right.outputSet
+      val (joinConditions, others) = conditions.partition(
+        e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
+      val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
+
+      // should not have reference to same logical plan
+      createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
+    }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case j @ ExtractFiltersAndInnerJoins(input, conditions)
+        if input.size > 2 && conditions.nonEmpty =>
+      createOrderedJoin(input, conditions)
+  }
+}
+
+
+/**
+ * Elimination of outer joins, if the predicates can restrict the result sets so that
+ * all null-supplying rows are eliminated
+ *
+ * - full outer -> inner if both sides have such predicates
+ * - left outer -> inner if the right side has such predicates
+ * - right outer -> inner if the left side has such predicates
+ * - full outer -> left outer if only the left side has such predicates
+ * - full outer -> right outer if only the right side has such predicates
+ *
+ * This rule should be executed before pushing down the Filter
+ */
+object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+  /**
+   * Returns whether the expression returns null or false when all inputs are nulls.
+   */
+  private def canFilterOutNull(e: Expression): Boolean = {
+    if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
+    val attributes = e.references.toSeq
+    val emptyRow = new GenericInternalRow(attributes.length)
+    val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+    v == null || v == false
+  }
+
+  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
+    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
+    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
+
+    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
+
+    join.joinType match {
+      case RightOuter if leftHasNonNullPredicate => Inner
+      case LeftOuter if rightHasNonNullPredicate => Inner
+      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
+      case FullOuter if leftHasNonNullPredicate => LeftOuter
+      case FullOuter if rightHasNonNullPredicate => RightOuter
+      case o => o
+    }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
+      val newJoinType = buildNewJoinType(f, j)
+      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org