You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/04/13 14:27:23 UTC

spark git commit: [SPARK-20233][SQL] Apply star-join filter heuristics to dynamic programming join enumeration

Repository: spark
Updated Branches:
  refs/heads/master a4293c284 -> fbe4216e1


[SPARK-20233][SQL] Apply star-join filter heuristics to dynamic programming join enumeration

## What changes were proposed in this pull request?

Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph:

```
T1       D1 - T2 - T3
  \     /
    F1
     |
    D2

star-join: {F1, D1, D2}
non-star: {T1, T2, T3}
```
The following join combinations will be generated:
```
level 0: (F1), (D1), (D2), (T1), (T2), (T3)
level 1: {F1, D1}, {F1, D2}, {T2, T3}
level 2: {F1, D1, D2}
level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
level 6: {F1, D1, D2, T1, T2, T3}
```

## How was this patch tested?

New test suite ```StarJOinCostBasedReorderSuite.scala```.

Author: Ioana Delaney <io...@gmail.com>

Closes #17546 from ioana-delaney/starSchemaCBOv3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe4216e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe4216e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe4216e

Branch: refs/heads/master
Commit: fbe4216e1e83d243a7f0521b76bfb20c25278281
Parents: a4293c2
Author: Ioana Delaney <io...@gmail.com>
Authored: Thu Apr 13 22:27:04 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Apr 13 22:27:04 2017 +0800

----------------------------------------------------------------------
 .../optimizer/CostBasedJoinReorder.scala        | 144 ++++++-
 .../optimizer/StarSchemaDetection.scala         |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../StarJoinCostBasedReorderSuite.scala         | 426 +++++++++++++++++++
 4 files changed, 571 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index cbd5064..c704c2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
 
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
     val (items, conditions) = extractInnerJoins(plan)
-    // TODO: Compute the set of star-joins and use them in the join enumeration
-    // algorithm to prune un-optimal plan choices.
     val result =
       // Do reordering if the number of items is appropriate and join conditions exist.
       // We also need to check if costs of all items can be evaluated.
@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
       case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
     }.toMap)
 
+    // Build filters from the join graph to be used by the search algorithm.
+    val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
+
     // Build plans for next levels until the last level has only one plan. This plan contains
     // all items that can be joined, so there's no need to continue.
     val topOutputSet = AttributeSet(output)
-    while (foundPlans.size < items.length && foundPlans.last.size > 1) {
+    while (foundPlans.size < items.length) {
       // Build plans for the next level.
-      foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet)
+      foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
     }
 
     val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
@@ -179,7 +180,8 @@ object JoinReorderDP extends PredicateHelper with Logging {
       existingLevels: Seq[JoinPlanMap],
       conf: SQLConf,
       conditions: Set[Expression],
-      topOutput: AttributeSet): JoinPlanMap = {
+      topOutput: AttributeSet,
+      filters: Option[JoinGraphInfo]): JoinPlanMap = {
 
     val nextLevel = mutable.Map.empty[Set[Int], JoinPlan]
     var k = 0
@@ -200,7 +202,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
         }
 
         otherSideCandidates.foreach { otherSidePlan =>
-          buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
+          buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput, filters) match {
             case Some(newJoinPlan) =>
               // Check if it's the first plan for the item set, or it's a better plan than
               // the existing one due to lower cost.
@@ -218,14 +220,20 @@ object JoinReorderDP extends PredicateHelper with Logging {
   }
 
   /**
-   * Builds a new JoinPlan when both conditions hold:
+   * Builds a new JoinPlan if the following conditions hold:
    * - the sets of items contained in left and right sides do not overlap.
    * - there exists at least one join condition involving references from both sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   *         1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   *         2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   *         3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
    * @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
    * @param otherJoinPlan The other side JoinPlan for building a new join node.
    * @param conf SQLConf for statistics computation.
    * @param conditions The overall set of join conditions.
    * @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search algorithm.
    * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
    */
   private def buildJoin(
@@ -233,13 +241,27 @@ object JoinReorderDP extends PredicateHelper with Logging {
       otherJoinPlan: JoinPlan,
       conf: SQLConf,
       conditions: Set[Expression],
-      topOutput: AttributeSet): Option[JoinPlan] = {
+      topOutput: AttributeSet,
+      filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
 
     if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
       // Should not join two overlapping item sets.
       return None
     }
 
+    if (filters.isDefined) {
+      // Apply star-join filter, which ensures that tables in a star schema relationship
+      // are planned together. The star-filter will eliminate joins among star and non-star
+      // tables until the star joins are built. The following combinations are allowed:
+      // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+      // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+      // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+      val isValidJoinCombination =
+        JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
+          filters.get)
+      if (!isValidJoinCombination) return None
+    }
+
     val onePlan = oneJoinPlan.plan
     val otherPlan = otherJoinPlan.plan
     val joinConds = conditions
@@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging {
 case class Cost(card: BigInt, size: BigInt) {
   def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
 }
+
+/**
+ * Implements optional filters to reduce the search space for join enumeration.
+ *
+ * 1) Star-join filters: Plan star-joins together since they are assumed
+ *    to have an optimal execution based on their RI relationship.
+ * 2) Cartesian products: Defer their planning later in the graph to avoid
+ *    large intermediate results (expanding joins, in general).
+ * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing
+ *   intermediate results.
+ *
+ * Filters (2) and (3) are not implemented.
+ */
+object JoinReorderDPFilters extends PredicateHelper {
+  /**
+   * Builds join graph information to be used by the filtering strategies.
+   * Currently, it builds the sets of star/non-star joins.
+   * It can be extended with the sets of connected/unconnected joins, which
+   * can be used to filter Cartesian products.
+   */
+  def buildJoinGraphInfo(
+      conf: SQLConf,
+      items: Seq[LogicalPlan],
+      conditions: Set[Expression],
+      itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
+
+    if (conf.joinReorderDPStarFilter) {
+      // Compute the tables in a star-schema relationship.
+      val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
+      val nonStarJoin = items.filterNot(starJoin.contains(_))
+
+      if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
+        val itemMap = itemIndex.toMap
+        Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))
+      } else {
+        // Nothing interesting to return.
+        None
+      }
+    } else {
+      // Star schema filter is not enabled.
+      None
+    }
+  }
+
+  /**
+   * Applies the star-join filter that eliminates join combinations among star
+   * and non-star tables until the star join is built.
+   *
+   * Given the oneSideJoinPlan/otherSideJoinPlan, which represent all the plan
+   * permutations generated by the DP join enumeration, and the star/non-star plans,
+   * the following plan combinations are allowed:
+   * 1. (oneSideJoinPlan U otherSideJoinPlan) is a subset of star-join
+   * 2. star-join is a subset of (oneSideJoinPlan U otherSideJoinPlan)
+   * 3. (oneSideJoinPlan U otherSideJoinPlan) is a subset of non star-join
+   *
+   * It assumes the sets are disjoint.
+   *
+   * Example query graph:
+   *
+   * t1   d1 - t2 - t3
+   *  \  /
+   *   f1
+   *   |
+   *   d2
+   *
+   * star: {d1, f1, d2}
+   * non-star: {t2, t1, t3}
+   *
+   * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+   * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+   * level 2: {d2 f1 d1 }
+   * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+   * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+   * level 5: {d1 t3 t2 f1 t1 d2 }
+   *
+   * @param oneSideJoinPlan One side of the join represented as a set of plan ids.
+   * @param otherSideJoinPlan The other side of the join represented as a set of plan ids.
+   * @param filters Star and non-star plans represented as sets of plan ids
+   */
+  def starJoinFilter(
+      oneSideJoinPlan: Set[Int],
+      otherSideJoinPlan: Set[Int],
+      filters: JoinGraphInfo) : Boolean = {
+    val starJoins = filters.starJoins
+    val nonStarJoins = filters.nonStarJoins
+    val join = oneSideJoinPlan.union(otherSideJoinPlan)
+
+    // Disjoint sets
+    oneSideJoinPlan.intersect(otherSideJoinPlan).isEmpty &&
+      // Either star or non-star is empty
+      (starJoins.isEmpty || nonStarJoins.isEmpty ||
+        // Join is a subset of the star-join
+        join.subsetOf(starJoins) ||
+        // Star-join is a subset of join
+        starJoins.subsetOf(join) ||
+        // Join is a subset of non-star
+        join.subsetOf(nonStarJoins))
+  }
+}
+
+/**
+ * Helper class that keeps information about the join graph as sets of item/plan ids.
+ * It currently stores the star/non-star plans. It can be
+ * extended with the set of connected/unconnected plans.
+ */
+case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])

http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
index 91cb004..97ee998 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {
 
     val emptyStarJoinPlan = Seq.empty[LogicalPlan]
 
-    if (!conf.starSchemaDetection || input.size < 2) {
+    if (input.size < 2) {
       emptyStarJoinPlan
     } else {
       // Find if the input plans are eligible for star join detection.

http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6b0f495..2e1798e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -736,6 +736,12 @@ object SQLConf {
       .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
       .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+    buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+      .doc("Applies star-join filter heuristics to cost based join enumeration.")
+      .booleanConf
+      .createWithDefault(false)
+
   val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
     .doc("When true, it enables join reordering based on star schema detection. ")
     .booleanConf
@@ -1011,6 +1017,8 @@ class SQLConf extends Serializable with Logging {
 
   def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)
 
+  def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
+
   def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
 
   def sortMergeJoinExecBufferSpillThreshold: Int =

http://git-wip-us.apache.org/repos/asf/spark/blob/fbe4216e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
new file mode 100644
index 0000000..a23d626
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+
+
+class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase {
+
+  override val conf = new SQLConf().copy(
+    CBO_ENABLED -> true,
+    JOIN_REORDER_ENABLED -> true,
+    JOIN_REORDER_DP_STAR_FILTER -> true)
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Operator Optimizations", FixedPoint(100),
+        CombineFilters,
+        PushDownPredicate,
+        ReorderJoin(conf),
+        PushPredicateThroughJoin,
+        ColumnPruning,
+        CollapseProject) ::
+        Batch("Join Reorder", Once,
+          CostBasedJoinReorder(conf)) :: Nil
+  }
+
+  private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
+    // F1 (fact table)
+    attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+
+    // D1 (dimension)
+    attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+
+    // D2 (dimension)
+    attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+
+    // D3 (dimension)
+    attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 0, avgLen = 4, maxLen = 4),
+
+    // T1 (regular table i.e. outside star)
+    attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+
+    // T2 (regular table)
+    attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+
+    // T3 (regular table)
+    attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+
+    // T4 (regular table)
+    attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+
+    // T5 (regular table)
+    attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+
+    // T6 (regular table)
+    attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
+      nullCount = 1, avgLen = 4, maxLen = 4)
+
+  ))
+
+  private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
+  private val nameToColInfo: Map[String, (Attribute, ColumnStat)] =
+    columnInfo.map(kv => kv._1.name -> kv)
+
+  private val f1 = StatsTestPlan(
+    outputList = Seq("f1_fk1", "f1_fk2", "f1_fk3", "f1_c1", "f1_c2").map(nameToAttr),
+    rowCount = 1000,
+    size = Some(1000 * (8 + 4 * 5)),
+    attributeStats = AttributeMap(Seq("f1_fk1", "f1_fk2", "f1_fk3", "f1_c1", "f1_c2")
+      .map(nameToColInfo)))
+
+  // To control the layout of the join plans, keep the size for the non-fact tables constant
+  // and vary the rowcount and the number of distinct values of the join columns.
+  private val d1 = StatsTestPlan(
+    outputList = Seq("d1_pk", "d1_c2", "d1_c3").map(nameToAttr),
+    rowCount = 100,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("d1_pk", "d1_c2", "d1_c3").map(nameToColInfo)))
+
+  private val d2 = StatsTestPlan(
+    outputList = Seq("d2_pk", "d2_c2", "d2_c3").map(nameToAttr),
+    rowCount = 20,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("d2_pk", "d2_c2", "d2_c3").map(nameToColInfo)))
+
+  private val d3 = StatsTestPlan(
+    outputList = Seq("d3_pk", "d3_c2", "d3_c3").map(nameToAttr),
+    rowCount = 10,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("d3_pk", "d3_c2", "d3_c3").map(nameToColInfo)))
+
+  private val t1 = StatsTestPlan(
+    outputList = Seq("t1_c1", "t1_c2", "t1_c3").map(nameToAttr),
+    rowCount = 50,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("t1_c1", "t1_c2", "t1_c3").map(nameToColInfo)))
+
+  private val t2 = StatsTestPlan(
+    outputList = Seq("t2_c1", "t2_c2", "t2_c3").map(nameToAttr),
+    rowCount = 10,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("t2_c1", "t2_c2", "t2_c3").map(nameToColInfo)))
+
+  private val t3 = StatsTestPlan(
+    outputList = Seq("t3_c1", "t3_c2", "t3_c3").map(nameToAttr),
+    rowCount = 10,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("t3_c1", "t3_c2", "t3_c3").map(nameToColInfo)))
+
+  private val t4 = StatsTestPlan(
+    outputList = Seq("t4_c1", "t4_c2", "t4_c3").map(nameToAttr),
+    rowCount = 10,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("t4_c1", "t4_c2", "t4_c3").map(nameToColInfo)))
+
+  private val t5 = StatsTestPlan(
+    outputList = Seq("t5_c1", "t5_c2", "t5_c3").map(nameToAttr),
+    rowCount = 10,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("t5_c1", "t5_c2", "t5_c3").map(nameToColInfo)))
+
+  private val t6 = StatsTestPlan(
+    outputList = Seq("t6_c1", "t6_c2", "t6_c3").map(nameToAttr),
+    rowCount = 10,
+    size = Some(3000),
+    attributeStats = AttributeMap(Seq("t6_c1", "t6_c2", "t6_c3").map(nameToColInfo)))
+
+  test("Test 1: Star query with two dimensions and two regular tables") {
+
+    // d1     t1
+    //   \   /
+    //    f1
+    //   /  \
+    // d2    t2
+    //
+    // star: {f1, d1, d2}
+    // non-star: {t1, t2}
+    //
+    // level 0: (t2 ), (d2 ), (f1 ), (d1 ), (t1 )
+    // level 1: {f1 d1 }, {d2 f1 }
+    // level 2: {d2 f1 d1 }
+    // level 3: {t2 d1 d2 f1 }, {t1 d1 d2 f1 }
+    // level 4: {f1 t1 t2 d1 d2 }
+    //
+    // Number of generated plans: 11 (vs. 20 w/o filter)
+    val query =
+      f1.join(t1).join(t2).join(d1).join(d2)
+        .where((nameToAttr("f1_c1") === nameToAttr("t1_c1")) &&
+          (nameToAttr("f1_c2") === nameToAttr("t2_c1")) &&
+          (nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
+          (nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+
+    val expected =
+      f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+        .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+        .join(t2, Inner, Some(nameToAttr("f1_c2") === nameToAttr("t2_c1")))
+        .join(t1, Inner, Some(nameToAttr("f1_c1") === nameToAttr("t1_c1")))
+
+    assertEqualPlans(query, expected)
+  }
+
+  test("Test 2: Star with a linear branch") {
+    //
+    //  t1   d1 - t2 - t3
+    //   \  /
+    //    f1
+    //    |
+    //    d2
+    //
+    // star: {d1, f1, d2}
+    // non-star: {t2, t1, t3}
+    //
+    // level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
+    // level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
+    // level 2: {d2 f1 d1 }
+    // level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
+    // level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
+    // level 5: {d1 t3 t2 f1 t1 d2 }
+    //
+    // Number of generated plans: 15 (vs 24)
+    val query =
+      d1.join(t1).join(t2).join(f1).join(d2).join(t3)
+        .where((nameToAttr("d1_pk") === nameToAttr("f1_fk1")) &&
+          (nameToAttr("t1_c1") === nameToAttr("f1_c1")) &&
+          (nameToAttr("d2_pk") === nameToAttr("f1_fk2")) &&
+          (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) &&
+          (nameToAttr("d1_c2") === nameToAttr("t2_c1")) &&
+          (nameToAttr("t2_c2") === nameToAttr("t3_c1")))
+
+    val expected =
+      f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+        .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+        .join(t3.join(t2, Inner, Some(nameToAttr("t2_c2") === nameToAttr("t3_c1"))), Inner,
+          Some(nameToAttr("d1_c2") === nameToAttr("t2_c1")))
+        .join(t1, Inner, Some(nameToAttr("t1_c1") === nameToAttr("f1_c1")))
+
+    assertEqualPlans(query, expected)
+  }
+
+  test("Test 3: Star with derived branches") {
+    //         t3   t2
+    //         |    |
+    //    d1 - t4 - t1
+    //    |
+    //    f1
+    //    |
+    //    d2
+    //
+    // star:  (d1 f1 d2 )
+    // non-star: (t4 t1 t2 t3 )
+    //
+    // level 0: (t1 ), (t3 ), (f1 ), (d1 ), (t2 ), (d2 ), (t4 )
+    // level 1: {f1 d2 }, {t1 t4 }, {t1 t2 }, {f1 d1 }, {t3 t4 }
+    // level 2: {d1 f1 d2 }, {t2 t1 t4 }, {t1 t3 t4 }
+    // level 3: {t4 d1 f1 d2 }, {t3 t4 t1 t2 }
+    // level 4: {d1 f1 t4 d2 t3 }, {d1 f1 t4 d2 t1 }
+    // level 5: {d1 f1 t4 d2 t1 t2 }, {d1 f1 t4 d2 t1 t3 }
+    // level 6: {d1 f1 t4 d2 t1 t2 t3 }
+    //
+    // Number of generated plans: 22 (vs. 34)
+    val query =
+      d1.join(t1).join(t2).join(t3).join(t4).join(f1).join(d2)
+        .where((nameToAttr("t1_c1") === nameToAttr("t2_c1")) &&
+          (nameToAttr("t3_c1") === nameToAttr("t4_c1")) &&
+          (nameToAttr("t1_c2") === nameToAttr("t4_c2")) &&
+          (nameToAttr("d1_c2") === nameToAttr("t4_c3")) &&
+          (nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
+          (nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+
+    val expected =
+      f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+        .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+        .join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
+          Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
+        .join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
+          Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
+
+    assertEqualPlans(query, expected)
+  }
+
+  test("Test 4: Star with several branches") {
+    //
+    //    d1 - t3 - t4
+    //    |
+    //    f1 - d3 - t1 - t2
+    //    |
+    //    d2 - t5 - t6
+    //
+    // star: {d1 f1 d2 d3 }
+    // non-star: {t5 t3 t6 t2 t4 t1}
+    //
+    // level 0: (t4 ), (d2 ), (t5 ), (d3 ), (d1 ), (f1 ), (t2 ), (t6 ), (t1 ), (t3 )
+    // level 1: {t5 t6 }, {t4 t3 }, {d3 f1 }, {t2 t1 }, {d2 f1 }, {d1 f1 }
+    // level 2: {d2 d1 f1 }, {d2 d3 f1 }, {d3 d1 f1 }
+    // level 3: {d2 d1 d3 f1 }
+    // level 4: {d1 t3 d3 f1 d2 }, {d1 d3 f1 t1 d2 }, {d1 t5 d3 f1 d2 }
+    // level 5: {d1 t5 d3 f1 t1 d2 }, {d1 t3 t4 d3 f1 d2 }, {d1 t5 t6 d3 f1 d2 },
+    //          {d1 t5 t3 d3 f1 d2 }, {d1 t3 d3 f1 t1 d2 }, {d1 t2 d3 f1 t1 d2 }
+    // level 6: {d1 t5 t3 t4 d3 f1 d2 }, {d1 t3 t2 d3 f1 t1 d2 }, {d1 t5 t6 d3 f1 t1 d2 },
+    //          {d1 t5 t3 d3 f1 t1 d2 }, {d1 t5 t2 d3 f1 t1 d2 }, ...
+    // ...
+    // level 9: {d1 t5 t3 t6 t2 t4 d3 f1 t1 d2 }
+    //
+    // Number of generated plans: 46 (vs. 82)
+    val query =
+      d1.join(t3).join(t4).join(f1).join(d2).join(t5).join(t6).join(d3).join(t1).join(t2)
+        .where((nameToAttr("d1_c2") === nameToAttr("t3_c1")) &&
+          (nameToAttr("t3_c2") === nameToAttr("t4_c2")) &&
+          (nameToAttr("d1_pk") === nameToAttr("f1_fk1")) &&
+          (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) &&
+          (nameToAttr("d2_c2") === nameToAttr("t5_c1")) &&
+          (nameToAttr("t5_c2") === nameToAttr("t6_c2")) &&
+          (nameToAttr("f1_fk3") === nameToAttr("d3_pk")) &&
+          (nameToAttr("d3_c2") === nameToAttr("t1_c1")) &&
+          (nameToAttr("t1_c2") === nameToAttr("t2_c2")))
+
+    val expected =
+      f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
+        .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+        .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+        .join(t4.join(t3, Inner, Some(nameToAttr("t3_c2") === nameToAttr("t4_c2"))), Inner,
+          Some(nameToAttr("d1_c2") === nameToAttr("t3_c1")))
+        .join(t2.join(t1, Inner, Some(nameToAttr("t1_c2") === nameToAttr("t2_c2"))), Inner,
+          Some(nameToAttr("d3_c2") === nameToAttr("t1_c1")))
+        .join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner,
+          Some(nameToAttr("d2_c2") === nameToAttr("t5_c1")))
+
+    assertEqualPlans(query, expected)
+  }
+
+  test("Test 5: RI star only") {
+    //    d1
+    //    |
+    //    f1
+    //   /  \
+    // d2    d3
+    //
+    // star: {f1, d1, d2, d3}
+    // non-star: {}
+    // level 0: (d1), (f1), (d2), (d3)
+    // level 1: {f1 d3 }, {f1 d2 }, {d1 f1 }
+    // level 2: {d1 f1 d2 }, {d2 f1 d3 }, {d1 f1 d3 }
+    // level 3: {d1 d2 f1 d3 }
+    // Number of generated plans: 11 (= 11)
+    val query =
+      d1.join(d2).join(f1).join(d3)
+        .where((nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
+          (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) &&
+          (nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
+
+    val expected =
+      f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
+        .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+        .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+
+    assertEqualPlans(query, expected)
+  }
+
+  test("Test 6: No RI star") {
+    //
+    // f1 - t1 - t2 - t3
+    //
+    // star: {}
+    // non-star: {f1, t1, t2, t3}
+    // level 0: (t1), (f1), (t2), (t3)
+    // level 1: {f1 t3 }, {f1 t2 }, {t1 f1 }
+    // level 2: {t1 f1 t2 }, {t2 f1 t3 }, {dt f1 t3 }
+    // level 3: {t1 t2 f1 t3 }
+    // Number of generated plans: 11 (= 11)
+    val query =
+      t1.join(f1).join(t2).join(t3)
+        .where((nameToAttr("f1_fk1") === nameToAttr("t1_c1")) &&
+          (nameToAttr("f1_fk2") === nameToAttr("t2_c1")) &&
+          (nameToAttr("f1_fk3") === nameToAttr("t3_c1")))
+
+    val expected =
+      f1.join(t3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("t3_c1")))
+        .join(t2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("t2_c1")))
+        .join(t1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("t1_c1")))
+
+    assertEqualPlans(query, expected)
+  }
+
+  private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
+    val optimized = Optimize.execute(plan1.analyze)
+    val expected = plan2.analyze
+    compareJoinOrder(optimized, expected)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org