You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/21 15:44:12 UTC

spark git commit: [SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method structure and add a debug log

Repository: spark
Updated Branches:
  refs/heads/master 650d03cfc -> 14865d7ff


[SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method structure and add a debug log

## What changes were proposed in this pull request?

1. Improve documentation for class `Cost` and `JoinReorderDP` and method `buildJoin()`.
2. Change code structure of `buildJoin()` to make the logic clearer.
3. Add a debug-level log to record information for join reordering, including time cost, the number of items and the number of plans in memo.

## How was this patch tested?

Not related.

Author: wangzhenhua <wa...@huawei.com>

Closes #17353 from wzhfy/reorderFollow.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14865d7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14865d7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14865d7f

Branch: refs/heads/master
Commit: 14865d7ff78db5cf9a3e8626204c8e7ed059c353
Parents: 650d03c
Author: wangzhenhua <wa...@huawei.com>
Authored: Tue Mar 21 08:44:09 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Tue Mar 21 08:44:09 2017 -0700

----------------------------------------------------------------------
 .../optimizer/CostBasedJoinReorder.scala        | 109 ++++++++++++-------
 .../org/apache/spark/sql/internal/SQLConf.scala |   1 +
 2 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14865d7f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 521c468..fc37720 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.mutable
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike}
 import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project}
@@ -51,7 +52,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
     }
   }
 
-  def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
+  private def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
     val (items, conditions) = extractInnerJoins(plan)
     // TODO: Compute the set of star-joins and use them in the join enumeration
     // algorithm to prune un-optimal plan choices.
@@ -69,7 +70,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
   }
 
   /**
-   * Extract consecutive inner joinable items and join conditions.
+   * Extracts items of consecutive inner joins and join conditions.
    * This method works for bushy trees and left/right deep trees.
    */
   private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = {
@@ -119,18 +120,21 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
  * When building m-way joins, we only keep the best plan (with the lowest cost) for the same set
  * of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among
  * plans (A J B) J C, (A J C) J B and (B J C) J A.
- *
- * Thus the plans maintained for each level when reordering four items A, B, C, D are as follows:
+ * We also prune cartesian product candidates when building a new plan if there exists no join
+ * condition involving references from both left and right. This pruning strategy significantly
+ * reduces the search space.
+ * E.g., given A J B J C J D with join conditions A.k1 = B.k1 and B.k2 = C.k2 and C.k3 = D.k3,
+ * plans maintained for each level are as follows:
  * level 0: p({A}), p({B}), p({C}), p({D})
- * level 1: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
- * level 2: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
+ * level 1: p({A, B}), p({B, C}), p({C, D})
+ * level 2: p({A, B, C}), p({B, C, D})
  * level 3: p({A, B, C, D})
  * where p({A, B, C, D}) is the final output plan.
  *
  * For cost evaluation, since physical costs for operators are not available currently, we use
  * cardinalities and sizes to compute costs.
  */
-object JoinReorderDP extends PredicateHelper {
+object JoinReorderDP extends PredicateHelper with Logging {
 
   def search(
       conf: SQLConf,
@@ -138,6 +142,7 @@ object JoinReorderDP extends PredicateHelper {
       conditions: Set[Expression],
       topOutput: AttributeSet): LogicalPlan = {
 
+    val startTime = System.nanoTime()
     // Level i maintains all found plans for i + 1 items.
     // Create the initial plans: each plan is a single item with zero cost.
     val itemIndex = items.zipWithIndex
@@ -152,6 +157,10 @@ object JoinReorderDP extends PredicateHelper {
       foundPlans += searchLevel(foundPlans, conf, conditions, topOutput)
     }
 
+    val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
+    logDebug(s"Join reordering finished. Duration: $durationInMs ms, number of items: " +
+      s"${items.length}, number of plans in memo: ${foundPlans.map(_.size).sum}")
+
     // The last level must have one and only one plan, because all items are joinable.
     assert(foundPlans.size == items.length && foundPlans.last.size == 1)
     foundPlans.last.head._2.plan
@@ -183,18 +192,15 @@ object JoinReorderDP extends PredicateHelper {
         }
 
         otherSideCandidates.foreach { otherSidePlan =>
-          // Should not join two overlapping item sets.
-          if (oneSidePlan.itemIds.intersect(otherSidePlan.itemIds).isEmpty) {
-            val joinPlan = buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput)
-            if (joinPlan.isDefined) {
-              val newJoinPlan = joinPlan.get
+          buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
+            case Some(newJoinPlan) =>
               // Check if it's the first plan for the item set, or it's a better plan than
               // the existing one due to lower cost.
               val existingPlan = nextLevel.get(newJoinPlan.itemIds)
               if (existingPlan.isEmpty || newJoinPlan.betterThan(existingPlan.get, conf)) {
                 nextLevel.update(newJoinPlan.itemIds, newJoinPlan)
               }
-            }
+            case None =>
           }
         }
       }
@@ -203,7 +209,17 @@ object JoinReorderDP extends PredicateHelper {
     nextLevel.toMap
   }
 
-  /** Build a new join node. */
+  /**
+   * Builds a new JoinPlan when both conditions hold:
+   * - the sets of items contained in left and right sides do not overlap.
+   * - there exists at least one join condition involving references from both sides.
+   * @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
+   * @param otherJoinPlan The other side JoinPlan for building a new join node.
+   * @param conf SQLConf for statistics computation.
+   * @param conditions The overall set of join conditions.
+   * @param topOutput The output attributes of the final plan.
+   * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
+   */
   private def buildJoin(
       oneJoinPlan: JoinPlan,
       otherJoinPlan: JoinPlan,
@@ -211,6 +227,11 @@ object JoinReorderDP extends PredicateHelper {
       conditions: Set[Expression],
       topOutput: AttributeSet): Option[JoinPlan] = {
 
+    if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
+      // Should not join two overlapping item sets.
+      return None
+    }
+
     val onePlan = oneJoinPlan.plan
     val otherPlan = otherJoinPlan.plan
     val joinConds = conditions
@@ -220,33 +241,33 @@ object JoinReorderDP extends PredicateHelper {
     if (joinConds.isEmpty) {
       // Cartesian product is very expensive, so we exclude them from candidate plans.
       // This also significantly reduces the search space.
-      None
+      return None
+    }
+
+    // Put the deeper side on the left, tend to build a left-deep tree.
+    val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
+      (onePlan, otherPlan)
     } else {
-      // Put the deeper side on the left, tend to build a left-deep tree.
-      val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
-        (onePlan, otherPlan)
+      (otherPlan, onePlan)
+    }
+    val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
+    val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
+    val remainingConds = conditions -- collectedJoinConds
+    val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
+    val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
+    val newPlan =
+      if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
+        Project(neededFromNewJoin.toSeq, newJoin)
       } else {
-        (otherPlan, onePlan)
+        newJoin
       }
-      val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
-      val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
-      val remainingConds = conditions -- collectedJoinConds
-      val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
-      val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
-      val newPlan =
-        if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
-          Project(neededFromNewJoin.toSeq, newJoin)
-        } else {
-          newJoin
-        }
 
-      val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
-      // Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf
-      // item), so the cost of the new join should also include its own cost.
-      val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
-        otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
-      Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
-    }
+    val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
+    // Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf
+    // item), so the cost of the new join should also include its own cost.
+    val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
+      otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
+    Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
   }
 
   /** Map[set of item ids, join plan for these items] */
@@ -278,10 +299,10 @@ object JoinReorderDP extends PredicateHelper {
     }
 
     def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
-      if (other.planCost.rows == 0 || other.planCost.size == 0) {
+      if (other.planCost.card == 0 || other.planCost.size == 0) {
         false
       } else {
-        val relativeRows = BigDecimal(this.planCost.rows) / BigDecimal(other.planCost.rows)
+        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
         val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
         relativeRows * conf.joinReorderCardWeight +
           relativeSize * (1 - conf.joinReorderCardWeight) < 1
@@ -290,7 +311,11 @@ object JoinReorderDP extends PredicateHelper {
   }
 }
 
-/** This class defines the cost model. */
-case class Cost(rows: BigInt, size: BigInt) {
-  def +(other: Cost): Cost = Cost(this.rows + other.rows, this.size + other.size)
+/**
+ * This class defines the cost model for a plan.
+ * @param card Cardinality (number of rows).
+ * @param size Size in bytes.
+ */
+case class Cost(card: BigInt, size: BigInt) {
+  def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/14865d7f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b6e0b8c..d5006c1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -708,6 +708,7 @@ object SQLConf {
     buildConf("spark.sql.cbo.joinReorder.dp.threshold")
       .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.")
       .intConf
+      .checkValue(number => number > 0, "The maximum number must be a positive integer.")
       .createWithDefault(12)
 
   val JOIN_REORDER_CARD_WEIGHT =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org